为什么Parallel.Foreach会创建无穷无尽的线程?

下面的代码继续创建线程,即使队列是空的。最终会发生OutOfMemoryexception。 如果我用常规foreach替换Parallel.ForEach,则不会发生这种情况。 有人知道为什么会这样吗?

public delegate void DataChangedDelegate(DataItem obj); public class Consumer { public DataChangedDelegate OnCustomerChanged; public DataChangedDelegate OnOrdersChanged; private CancellationTokenSource cts; private CancellationToken ct; private BlockingCollection queue; public Consumer(BlockingCollection queue) { this.queue = queue; Start(); } private void Start() { cts = new CancellationTokenSource(); ct = cts.Token; Task.Factory.StartNew(() => DoWork(), ct); } private void DoWork() { Parallel.ForEach(queue.GetConsumingPartitioner(), item => { if (item.DataType == DataTypes.Customer) { OnCustomerChanged(item); } else if(item.DataType == DataTypes.Order) { OnOrdersChanged(item); } }); } } 

我认为Parallel.ForEach()主要用于处理有界集合。 并且它不期望像GetConsumingPartitioner()返回的GetConsumingPartitioner() ,其中MoveNext()长时间阻塞。

问题是Parallel.ForEach()试图找到最佳的并行度,因此它启动Task因为TaskScheduler允许它运行。 但是TaskScheduler看到有很多Task需要很长时间才能完成,并且他们没有做任何事情(他们阻止)所以它继续开始新的任务。

我认为最好的解决方案是设置MaxDegreeOfParallelism

作为替代方案,您可以使用TPL Dataflow的ActionBlock 。 这种情况的主要区别在于,当没有要处理的项时, ActionBlock不会阻塞任何线程,因此线程数不会接近限制。

生产者/消费者模式主要在只有一个生产者和一个消费者时使用。

但是,您尝试实现的目标(多个消费者)更适合工作清单模式。 下面的代码来自于犹他大学教授的并行编程课程的unit2幻灯片“2c – 共享内存模式”的幻灯片,可从http://ppcp.codeplex.com/下载。

 BlockingCollection workList; CancellationTokenSource cts; int itemcount public void Run() { int num_workers = 4; //create worklist, filled with initial work worklist = new BlockingCollection( new ConcurrentQueue(GetInitialWork())); cts = new CancellationTokenSource(); itemcount = worklist.Count(); for( int i = 0; i < num_workers; i++) Task.Factory.StartNew( RunWorker ); } IEnumberable GetInitialWork() { ... } public void RunWorker() { try { do { Item i = worklist.Take( cts.Token ); //blocks until item available or cancelled Process(i); //exit loop if no more items left } while (Interlocked.Decrement( ref itemcount) > 0); } finally { if( ! cts.IsCancellationRequested ) cts.Cancel(); } } } public void AddWork( Item item) { Interlocked.Increment( ref itemcount ); worklist.Add(item); } public void Process( Item i ) { //Do what you want to the work item here. } 

上面的代码允许您将工作列表项添加到队列中,并允许您设置任意数量的工作程序(在本例中为4)以将项目从队列中拉出并处理它们。

.Net 4.0上的Parallelism的另一个重要资源是“使用Microsoft .Net进行并行编程”一书,可从以下url免费获取: http : //msdn.microsoft.com/en-us/library/ff963553

在任务并行库的内部,Parallel.For和Parallel.Foreach遵循爬山算法来确定应该使用多少并行度来进行操作。

或多或少,他们开始在一个任务上运行正文,移动到两个,依此类推,直到达到一个断点,他们需要减少任务数量。

这对于快速完成的方法体来说非常有效,但如果身体需要很长时间才能运行,它可能需要很长时间才能实现它需要减少并行度。 在此之前,它会继续添加任务,并可能导致计算机崩溃。

我在任务并行库的一位开发人员的演讲中学到了上述内容。

指定MaxDegreeOfParallelism可能是最简单的方法。