表观BufferBlock.Post/Receive/ReceiveAsync种族/错误

交叉发布到http://social.msdn.microsoft.com/Forums/en-US/tpldataflow/thread/89b3f71d-3777-4fad-9c11-50d8dc81a4a9

我知道……我并没有真正使用TplDataflow来发挥它的最大潜力。 ATM我只是使用BufferBlock作为消息传递的安全队列,其中生产者和消费者以不同的速率运行。 我看到一些奇怪的行为让我感到难以理解如何继续。

 private BufferBlock messageQueue = new BufferBlock(); public void Send(object message) { var accepted=messageQueue.Post(message); logger.Info("Send message was called qlen = {0} accepted={1}", messageQueue.Count,accepted); } public async Task GetMessageAsync() { try { var m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(30)); //despite messageQueue.Count>0 next line //occasionally does not execute logger.Info("message received"); //....... } catch(TimeoutException) { //do something } } 

在上面的代码中(它是2000行分布式解决方案的一部分),每100ms左右定期调用Send 。 这意味着一个项目每秒大约10次PostmessageQueue 。 这已经过validation。 但是,偶尔看起来ReceiveAsync在超时内没有完成(即Post没有导致ReceiveAsync完成)并且30秒后引发了TimeoutException 。 此时, messageQueue.Count数百个。 这是出乎意料的。 在发布速度较慢(1个post/秒)时也观察到此问题,并且通常在1000个项目通过BufferBlock之前发生。

因此,要解决此问题,我使用以下代码,它可以工作,但偶尔会在接收时导致1秒延迟(由于上面发生的错误)

  public async Task GetMessageAsync() { try { object m; var attempts = 0; for (; ; ) { try { m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(1)); } catch (TimeoutException) { attempts++; if (attempts >= 30) throw; continue; } break; } logger.Info("message received"); //....... } catch(TimeoutException) { //do something } } 

这看起来像TDF中的竞争条件,但我无法BufferBlock为什么在我以类似方式使用BufferBlock的其他地方不会发生这种情况。 从ReceiveAsync实验性地更改为Receive没有帮助。 我没有检查过,但我想孤立一点,上面的代码完美无缺。 这是我在“TPL数据流简介” tpldataflow.docx中记录的模式 。

我该怎么做才能找到底线? 是否有任何指标可能有助于推断正在发生的事情? 如果我无法创建可靠的测试用例,我可以提供哪些更多信息?

救命!

斯蒂芬似乎认为以下是解决方案

var m = await messageQueue.ReceiveAsync();

代替:

var m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(30));

你能否证实或否认这一点?