如何使用C#任务并行库和IProducerConsumerCollection实现通用回调?

我有一个组件向基于Web的API提交请求,但必须限制这些请求,以免违反API的数据限制。 这意味着所有请求必须通过队列来控制它们的提交速率,但它们可以(并且应该)同时执行以实现最大吞吐量。 每个请求必须在完成后的某个时刻将某些数据返回给调用代码。

我正在努力创建一个很好的模型来处理数据的返回。

使用BlockingCollection我不能只从Schedule方法返回Task ,因为入队和出队进程位于缓冲区的两端。 因此,我创建了一个RequestItem类型,其中包含Action<Task>forms的回调。

这个想法是,一旦一个项目被从队列中拉出,就可以用启动的任务调用回调,但是我已经丢失了那个点的generics类型参数,而且我还在使用reflection和各种恶作剧(如果它是甚至可能)。

例如:

 public class RequestScheduler { private readonly BlockingCollection _queue = new BlockingCollection(); public RequestScheduler() { this.Start(); } // This can't return Task, so returns void. // Instead RequestItem is generic but this poses problems when adding to the queue public void Schedule(RequestItem request) { _queue.Add(request); } private void Start() { Task.Factory.StartNew(() => { foreach (var item in _queue.GetConsumingEnumerable()) { // I want to be able to use the original type parameters here // is there a nice way without reflection? // ProcessItem submits an HttpWebRequest Task.Factory.StartNew(() => ProcessItem(item)) .ContinueWith(t => { item.Callback(t); }); } }); } public void Stop() { _queue.CompleteAdding(); } } public class RequestItem : IRequestItem { public IOperation Operation { get; set; } public Action<Task> Callback { get; set; } } 

如何从缓冲区中提取请求并将其提交给API,如何继续缓冲我的请求,但将Task返回给客户端?

首先,您可以Schedule()返回Task ,您只需要使用TaskCompletionSource

其次,为了解决通用性问题,你可以隐藏所有内容(非generics) Action 。 在Schedule() ,使用完全符合您需要的lambda创建一个动作。 消费循环然后将执行该动作,它不需要知道内部是什么。

第三,我不明白你为什么要在循环的每次迭代中开始一个新的Task 。 首先,它意味着你实际上不会受到任何限制。

通过这些修改,代码可能如下所示:

 public class RequestScheduler { private readonly BlockingCollection m_queue = new BlockingCollection(); public RequestScheduler() { this.Start(); } private void Start() { Task.Factory.StartNew(() => { foreach (var action in m_queue.GetConsumingEnumerable()) { action(); } }, TaskCreationOptions.LongRunning); } public Task Schedule(IOperation operation) { var tcs = new TaskCompletionSource(); Action action = () => { try { tcs.SetResult(ProcessItem(operation)); } catch (Exception e) { tcs.SetException(e); } }; m_queue.Add(action); return tcs.Task; } private T ProcessItem(IOperation operation) { // whatever } }