为什么迭代GetConsumingEnumerable()不会完全清空底层的阻塞集合

在尝试创建简单管道时,使用任务并行库, BlockingCollectionConcurrentQueueGetConsumingEnumerable ,我有一个可量化且可重复的问题。

简而言之,将条目添加到一个线程的默认BlockingCollection (在引擎盖下依赖于ConcurrentQueue )并不能保证它们将从另一个调用该线程的线程中弹出BlockingCollectionGetConsumingEnumerable()方法。

我创建了一个非常简单的Winforms应用程序来重现/模拟它,它只是将整数打印到屏幕上。

  • Timer1负责排队工作项……它使用一个名为_tracker的并发字典,以便它知道它已经添加到阻塞集合中的内容。
  • Timer2只记录BlockingCollection_tracker的计数状态
  • START按钮启动Paralell.ForEach ,它只是遍历阻塞集合GetConsumingEnumerable()并开始将它们打印到第二个列表框。
  • STOP按钮停止Timer1防止更多条目被添加到阻塞集合中。
 public partial class Form1 : Form { private int Counter = 0; private BlockingCollection _entries; private ConcurrentDictionary _tracker; private CancellationTokenSource _tokenSource; private TaskFactory _factory; public Form1() { _entries = new BlockingCollection(); _tracker = new ConcurrentDictionary(); _tokenSource = new CancellationTokenSource(); _factory = new TaskFactory(); InitializeComponent(); } private void timer1_Tick(object sender, EventArgs e) { //ADDING TIMER -> LISTBOX 1 for(var i = 0; i  LIST BOX 3 listBox3.Items.Add(string.Format("Tracker Count : {0} / Entries Count : {1}", _tracker.Count, _entries.Count)); } private void button1_Click(object sender, EventArgs e) { //START BUTTON -> LOGS TO LIST BOX 2 var options = new ParallelOptions { CancellationToken = _tokenSource.Token, MaxDegreeOfParallelism = 1 }; _factory.StartNew(() => { Parallel.ForEach(_entries.GetConsumingEnumerable(), options, DoWork); }); timer1.Enabled = timer2.Enabled = true; timer1.Start(); timer2.Start(); } private void DoWork(int entry) { Thread.Sleep(1000); //Sleep for 1 second to simulate work being done. Invoke((MethodInvoker)(() => listBox2.Items.Add(string.Format("Processed {0}", entry)))); int oldEntry; _tracker.TryRemove(entry, out oldEntry); } private void button2_Click(object sender, EventArgs e) { //STOP BUTTON timer1.Stop(); timer1.Enabled = false; } 

这是事件的顺序:

  • 按开始
  • Timer1滴答和ListBox1立即更新3条消息(添加0,1,2)
  • ListBox2随后更新为3条消息,相隔1秒
    • 处理0
    • 处理1
    • 处理2
  • Timer1滴答和ListBox1立即更新3条消息(添加3,4,5)
  • ListBox2随后更新了2条消息,相隔1秒
    • 处理3
    • 处理4
    • 处理5没有打印……似乎已“失踪”
  • 按STOP可防止计时器1添加更多消息
  • 等待……“处理5”仍然没有出现

缺少参赛作品

您可以看到并发字典仍在跟踪尚未处理的1个项目,并随后从_tracker删除

如果我再次按Start,则timer1开始添加更多3个条目,并行循环恢复生命打印5,6,7和8。

在后面的项目推进后面的条目返回

我完全不知道为什么会这样。 再次调用启动显然会调用newtask,它会调用Paralell foreach,并重新执行GetConsumingEnumerable(),它会神奇地找到丢失的条目……

为什么BlockingCollection.GetConsumingEnumerable()不保证迭代添加到集合中的每个项目。

为什么随后添加更多条目会导致它“无法”并继续处理?

您不能在Parallel.ForEach()使用GetConsumingEnumerable() Parallel.ForEach()

使用TPL附加function中的GetConsumingPartitioner

在博客文章中,您还将获得解释为什么不能使用GetConsumingEnumerable()

Parallel.ForEach和PLINQ默认使用的分区算法使用分块以最小化同步成本:而不是每个元素锁定一次,它将获取锁定,获取一组元素(一个块),然后释放锁。

即Parallel.ForEach在继续之前等待它收到一组工作项。 正是您的实验所显示的内容。

从.net 4.5开始,您可以创建一个一次只能使用1个项目的分区程序:

 var partitioner = Partitioner.Create(jobsBatchesQ.queue.GetConsumingEnumerable(), EnumerablePartitionerOptions.NoBuffering); Parallel.ForEach(partitioner, new ParallelOptions { MaxDegreeOfParallelism = (currentTask.ParallelLevel > 0 ? currentTask.ParallelLevel : 1) }, (batch, state) => {//do stuff} 

https://msdn.microsoft.com/en-us/library/system.collections.concurrent.enumerablepartitioneroptions(v=vs.110).aspx

我无法用简单的控制台应用程序复制你的行为,基本上做同样的事情(在.Net 4.5 beta上运行,这可能会有所不同)。 但我认为发生这种情况的原因是Parallel.ForEach()尝试通过将输入集合拆分为块来优化执行。 使用您的枚举,只有在向集合中添加更多项目之后才能创建块。 有关更多信息,请参阅MSDN上的PLINQ和TPL的自定义分区程序

要解决此问题,请不要使用Parallel.ForEach() 。 如果您仍希望并行处理项目,则可以在每次迭代中启动Task

我觉得我应该注意,为了清楚起见,在执行Parallel.foreach之前能够调用BlockingCollection的.CompleteAdding()方法的情况下,上面描述的问题不会成为问题。 我已经多次使用这两个对象并取得了很好的效果。

此外,您可以在调用CompleteAdding()之后重新设置BlockingCollection以在需要时添加更多项目(_entries = new BlockingCollection();)

如下所示更改上面的单击事件代码可以解决缺少条目的问题并使其按预期工作,如果您多次单击开始和停止按钮:

 private void button2_Click(object sender, EventArgs e) { //STOP BUTTON timer1.Stop(); timer1.Enabled = false; >>>>_entries.CompleteAdding(); >>>>_entries = new BlockingCollection(); }