在TryReceiveAll之后使用OutputAvailableAsync的BufferBlock死锁
在处理这个问题 的答案时 ,我写了这个片段:
var buffer = new BufferBlock(); var producer = Task.Run(async () => { while (true) { await Task.Delay(TimeSpan.FromMilliseconds(100)); buffer.Post(null); Console.WriteLine("Post " + buffer.Count); } }); var consumer = Task.Run(async () => { while (await buffer.OutputAvailableAsync()) { IList items; buffer.TryReceiveAll(out items); Console.WriteLine("TryReceiveAll " + buffer.Count); } }); await Task.WhenAll(consumer, producer);
生产者应该每100毫秒将项目发布到缓冲区,并且消费者应该清除缓冲区中的所有项目并异步地等待更多项目显示。
实际发生的是生产者清除所有项目一次,然后再也不会超出OutputAvailableAsync
。 如果我切换消费者逐个删除项目,它将作为例外工作:
while (await buffer.OutputAvailableAsync()) { object item; while (buffer.TryReceive(out item)) ; }
我误会了什么吗? 如果没有,问题是什么?
这是一个由BufferBlock
内部使用的SourceCore
中的错误。 TryReceiveAll
方法在TryReceiveAll
执行时不会打开_enableOffering
布尔数据成员。 这导致从OutputAvailableAsync
返回的任务永远不会完成。
这是一个最小的重现:
var buffer = new BufferBlock
我刚刚使用此拉取请求将其修复到.Net核心存储库中。 希望修复程序很快就会在nuget包中找到它。
唉,这是2015年9月底,虽然i3arnon修复了错误,但在错误修复两天后发布的版本中没有解决:Microsoft TPL Dataflow版本4.5.24。
但是,IReceivableSourceBlock.TryReceive(…)可以正常工作。 扩展方法将解决问题。 在新版本的TPL Dataflow之后,可以轻松更改扩展方法。
/// /// This extension method returns all available items in the IReceivableSourceBlock /// or an empty sequence if nothing is available. The functin does not wait. /// /// The type of items stored in the IReceivableSourceBlock /// the source where the items should be extracted from /// The IList with the received items. Empty if no items were available public static IList TryReceiveAllEx (this IReceivableSourceBlock buffer) { /* Microsoft TPL Dataflow version 4.5.24 contains a bug in TryReceiveAll * Hence this function uses TryReceive until nothing is available anymore * */ IList receivedItems = new List (); T receivedItem = default(T); while (buffer.TryReceive (out receivedItem)) { receivedItems.Add(receivedItem); } return receivedItems; }
用法:
while (await this.bufferBlock.OutputAvailableAsync()) { // some data available var receivedItems = this.bufferBlock.TryReceiveAllEx(); if (receivedItems.Any()) { ProcessReceivedItems(bufferBlock); } }