使用blockingcollection和tasks .net 4 TPL的经典生产者消费者模式

请参阅下面的伪代码

//Single or multiple Producers produce using below method void Produce(object itemToQueue) { concurrentQueue.enqueue(itemToQueue); consumerSignal.set; } //somewhere else we have started a consumer like this //we have only one consumer void StartConsumer() { while (!concurrentQueue.IsEmpty()) { if (concurrentQueue.TrydeQueue(out item)) { //long running processing of item } } consumerSignal.WaitOne(); } 

我如何移植我从远古时代以来使用的模式来使用taskfactory创建的任务和net 4的新信号function。换句话说,如果有人用net 4编写这个模式,它会是什么样子? 伪代码很好。 我已经使用了.net 4 concurrentQueue了。 如何使用任务并可能使用一些较新的信令机制。 谢谢

感谢Jon / Dan解决我的问题。 甜。 没有手动信号或while(true)或while(itemstoProcess)类型循环像过去那样

 //Single or multiple Producers produce using below method void Produce(object itemToQueue) { blockingCollection.add(item); } //somewhere else we have started a consumer like this //this supports multiple consumers ! task(StartConsuming()).Start; void StartConsuming() { foreach (object item in blockingCollection.GetConsumingEnumerable()) { //long running processing of item } } cancellations are handled using cancel tokens 

你会使用BlockingCollection 。 文档中有一个例子。

该课程专门设计用于使这一点变得微不足道。

你的第二块代码看起来更好。 但是,启动Task然后立即等待它是没有意义的。 只需调用Take ,然后处理直接在消费线程上返回的项目。 这就是生产者 – 消费者模式的意图。 如果您认为工作项目的处理足够密集以保证更多的消费者,那么无论如何都要吸引更多的消费者。 BlockingCollection是安全的多个生产者多个消费者。

 public class YourCode { private BlockingCollection queue = new BlockingCollection(); public YourCode() { var thread = new Thread(StartConsuming); thread.IsBackground = true; thread.Start(); } public void Produce(object item) { queue.Add(item); } private void StartConsuming() { while (true) { object item = queue.Take(); // Add your code to process the item here. // Do not start another task or thread. } } } 

我之前使用过一种模式,创建了一种“按需”队列消费者(基于使用ConcurrentQueue消费):

  private void FireAndForget(Action fire) { _firedEvents.Enqueue(fire); lock (_taskLock) { if (_launcherTask == null) { _launcherTask = new Task(LaunchEvents); _launcherTask.ContinueWith(EventsComplete); _launcherTask.Start(); } } } private void LaunchEvents() { Action nextEvent; while (_firedEvents.TryDequeue(out nextEvent)) { if (_synchronized) { var syncEvent = nextEvent; _mediator._syncContext.Send(state => syncEvent(), null); } else { nextEvent(); } lock (_taskLock) { if (_firedEvents.Count == 0) { _launcherTask = null; break; } } } } private void EventsComplete(Task task) { if (task.IsFaulted && task.Exception != null) { // Do something with task Exception here } }