
在处理这个问题 的答案时 ,我写了这个片段:

var buffer = new BufferBlock(); var producer = Task.Run(async () => { while (true) { await Task.Delay(TimeSpan.FromMilliseconds(100)); buffer.Post(null); Console.WriteLine("Post " + buffer.Count); } }); var consumer = Task.Run(async () => { while (await buffer.OutputAvailableAsync()) { IList items; buffer.TryReceiveAll(out items); Console.WriteLine("TryReceiveAll " + buffer.Count); } }); await Task.WhenAll(consumer, producer); 


实际发生的是生产者清除所有项目一次,然后再也不会超出OutputAvailableAsync 。 如果我切换消费者逐个删除项目,它将作为例外工作:

 while (await buffer.OutputAvailableAsync()) { object item; while (buffer.TryReceive(out item)) ; } 

我误会了什么吗? 如果没有,问题是什么?

这是一个由BufferBlock内部使用的SourceCore中的错误。 TryReceiveAll方法在TryReceiveAll执行时不会打开_enableOffering布尔数据成员。 这导致从OutputAvailableAsync返回的任务永远不会完成。


 var buffer = new BufferBlock(); buffer.Post(null); IList items; buffer.TryReceiveAll(out items); var outputAvailableAsync = buffer.OutputAvailableAsync(); buffer.Post(null); await outputAvailableAsync; // Never completes 

我刚刚使用此拉取请求将其修复到.Net核心存储库中。 希望修复程序很快就会在nuget包中找到它。

唉,这是2015年9月底,虽然i3arnon修复了错误,但在错误修复两天后发布的版本中没有解决:Microsoft TPL Dataflow版本4.5.24。

但是,IReceivableSourceBlock.TryReceive(…)可以正常工作。 扩展方法将解决问题。 在新版本的TPL Dataflow之后,可以轻松更改扩展方法。

 ///  /// This extension method returns all available items in the IReceivableSourceBlock /// or an empty sequence if nothing is available. The functin does not wait. ///  /// The type of items stored in the IReceivableSourceBlock /// the source where the items should be extracted from  /// The IList with the received items. Empty if no items were available public static IList TryReceiveAllEx(this IReceivableSourceBlock buffer) { /* Microsoft TPL Dataflow version 4.5.24 contains a bug in TryReceiveAll * Hence this function uses TryReceive until nothing is available anymore * */ IList receivedItems = new List(); T receivedItem = default(T); while (buffer.TryReceive(out receivedItem)) { receivedItems.Add(receivedItem); } return receivedItems; } 


 while (await this.bufferBlock.OutputAvailableAsync()) { // some data available var receivedItems = this.bufferBlock.TryReceiveAllEx(); if (receivedItems.Any()) { ProcessReceivedItems(bufferBlock); } }