与BlockingCollection集成时,Parallel.ForEach停滞不前

我根据这个问题中的代码采用了我的并行/消费者实现

class ParallelConsumer : IDisposable { private readonly int _maxParallel; private readonly Action _action; private readonly TaskFactory _factory = new TaskFactory(); private CancellationTokenSource _tokenSource; private readonly BlockingCollection _entries = new BlockingCollection(); private Task _task; public ParallelConsumer(int maxParallel, Action action) { _maxParallel = maxParallel; _action = action; } public void Start() { try { _tokenSource = new CancellationTokenSource(); _task = _factory.StartNew( () => { Parallel.ForEach( _entries.GetConsumingEnumerable(), new ParallelOptions { MaxDegreeOfParallelism = _maxParallel, CancellationToken = _tokenSource.Token }, (item, loopState) => { Log("Taking" + item); if (!_tokenSource.IsCancellationRequested) { _action(item); Log("Finished" + item); } else { Log("Not Taking" + item); _entries.CompleteAdding(); loopState.Stop(); } }); }, _tokenSource.Token); } catch (OperationCanceledException oce) { System.Diagnostics.Debug.WriteLine(oce); } } private void Log(string message) { Console.WriteLine(message); } public void Stop() { Dispose(); } public void Enqueue(T entry) { Log("Enqueuing" + entry); _entries.Add(entry); } public void Dispose() { if (_task == null) { return; } _tokenSource.Cancel(); while (!_task.IsCanceled) { } _task.Dispose(); _tokenSource.Dispose(); _task = null; } } 

这是一个测试代码

 class Program { static void Main(string[] args) { TestRepeatedEnqueue(100, 1); } private static void TestRepeatedEnqueue(int itemCount, int parallelCount) { bool[] flags = new bool[itemCount]; var consumer = new ParallelConsumer(parallelCount, (i) => { flags[i] = true; } ); consumer.Start(); for (int i = 0; i  b == true)); } } 

测试总是失败 – 它总是停留在测试的100个项目的第93项。 知道我的代码的哪一部分导致了这个问题,以及如何解决它?

正如您所发现的,您不能将Parallel.Foreach()BlockingCollection.GetConsumingEnumerable()一起使用。

有关解释,请参阅此博客文章:

http://blogs.msdn.com/b/pfxteam/archive/2010/04/06/9990420.aspx

该博客还提供了一个名为GetConsumingPartitioner()的方法的源代码,您可以使用它来解决问题。

摘自博客:

BlockingCollection的GetConsumingEnumerable实现正在使用BlockingCollection的内部同步,它同时支持多个使用者,但ForEach不知道这一点,并且其可枚举分区逻辑在访问可枚举时也需要锁定。

因此,这里的同步比实际需要的更多,导致潜在的不可忽略的性能损失。

[另外] Parallel.ForEach和PLINQ默认使用的分区算法使用分块以最小化同步成本:而不是每个元素锁定一次,它将获取锁定,获取一组元素(一个块) ,然后释放锁。

虽然这种设计可以帮助提高整体吞吐量,但对于更注重低延迟的场景,这种分块可能会让人望而却步。

失败的原因是由于以下原因所解释的

Parallel.ForEach和PLINQ默认使用的分区算法使用分块以最小化同步成本:而不是每个元素锁定一次,它将获取锁定,获取一组元素(一个块),然后释放锁。

要使其工作,您可以在ParallelConsumer类上添加一个方法,以指示添加已完成,如下所示

  public void StopAdding() { _entries.CompleteAdding(); } 

现在在for loop之后调用此方法,如下所示

  consumer.Start(); for (int i = 0; i < itemCount; i++) { consumer.Enqueue(i); } consumer.StopAdding(); 

否则, Parallel.ForEach()将等待达到阈值,以便获取块并开始处理。