使用blockingcollection和tasks .net 4 TPL的经典生产者消费者模式
请参阅下面的伪代码
//Single or multiple Producers produce using below method void Produce(object itemToQueue) { concurrentQueue.enqueue(itemToQueue); consumerSignal.set; } //somewhere else we have started a consumer like this //we have only one consumer void StartConsumer() { while (!concurrentQueue.IsEmpty()) { if (concurrentQueue.TrydeQueue(out item)) { //long running processing of item } } consumerSignal.WaitOne(); }
我如何移植我从远古时代以来使用的模式来使用taskfactory创建的任务和net 4的新信号function。换句话说,如果有人用net 4编写这个模式,它会是什么样子? 伪代码很好。 我已经使用了.net 4 concurrentQueue了。 如何使用任务并可能使用一些较新的信令机制。 谢谢
感谢Jon / Dan解决我的问题。 甜。 没有手动信号或while(true)或while(itemstoProcess)类型循环像过去那样
//Single or multiple Producers produce using below method void Produce(object itemToQueue) { blockingCollection.add(item); } //somewhere else we have started a consumer like this //this supports multiple consumers ! task(StartConsuming()).Start; void StartConsuming() { foreach (object item in blockingCollection.GetConsumingEnumerable()) { //long running processing of item } } cancellations are handled using cancel tokens
你会使用BlockingCollection
。 文档中有一个例子。
该课程专门设计用于使这一点变得微不足道。
你的第二块代码看起来更好。 但是,启动Task
然后立即等待它是没有意义的。 只需调用Take
,然后处理直接在消费线程上返回的项目。 这就是生产者 – 消费者模式的意图。 如果您认为工作项目的处理足够密集以保证更多的消费者,那么无论如何都要吸引更多的消费者。 BlockingCollection
是安全的多个生产者和多个消费者。
public class YourCode { private BlockingCollection
我之前使用过一种模式,创建了一种“按需”队列消费者(基于使用ConcurrentQueue消费):
private void FireAndForget(Action fire) { _firedEvents.Enqueue(fire); lock (_taskLock) { if (_launcherTask == null) { _launcherTask = new Task(LaunchEvents); _launcherTask.ContinueWith(EventsComplete); _launcherTask.Start(); } } } private void LaunchEvents() { Action nextEvent; while (_firedEvents.TryDequeue(out nextEvent)) { if (_synchronized) { var syncEvent = nextEvent; _mediator._syncContext.Send(state => syncEvent(), null); } else { nextEvent(); } lock (_taskLock) { if (_firedEvents.Count == 0) { _launcherTask = null; break; } } } } private void EventsComplete(Task task) { if (task.IsFaulted && task.Exception != null) { // Do something with task Exception here } }