TPL数据流:有限容量并等待完成

下面我为了简单起见将一个真实场景复制为LINQPad脚本:

var total = 1 * 1000 * 1000; var cts = new CancellationTokenSource(); var threads = Environment.ProcessorCount; int capacity = 10; var edbOptions = new ExecutionDataflowBlockOptions{BoundedCapacity = capacity, CancellationToken = cts.Token, MaxDegreeOfParallelism = threads}; var dbOptions = new DataflowBlockOptions {BoundedCapacity = capacity, CancellationToken = cts.Token}; var gdbOptions = new GroupingDataflowBlockOptions {BoundedCapacity = capacity, CancellationToken = cts.Token}; var dlOptions = new DataflowLinkOptions {PropagateCompletion = true}; var counter1 = 0; var counter2 = 0; var delay1 = 10; var delay2 = 25; var action1 = new Func<IEnumerable, Task>(async x => {await Task.Delay(delay1); Interlocked.Increment(ref counter1);}); var action2 = new Func<IEnumerable, Task>(async x => {await Task.Delay(delay2); Interlocked.Increment(ref counter2);}); var actionBlock1 = new ActionBlock<IEnumerable>(action1, edbOptions); var actionBlock2 = new ActionBlock<IEnumerable>(action2, edbOptions); var batchBlock1 = new BatchBlock(5, gdbOptions); var batchBlock2 = new BatchBlock(5, gdbOptions); batchBlock1.LinkTo(actionBlock1, dlOptions); batchBlock2.LinkTo(actionBlock2, dlOptions); var bufferBlock1 = new BufferBlock(dbOptions); var bufferBlock2 = new BufferBlock(dbOptions); bufferBlock1.LinkTo(batchBlock1, dlOptions); bufferBlock2.LinkTo(batchBlock2, dlOptions); var bcBlock = new BroadcastBlock(x => x, dbOptions); bcBlock.LinkTo(bufferBlock1, dlOptions); bcBlock.LinkTo(bufferBlock2, dlOptions); var mainBlock = new TransformBlock(x => x.ToString(), edbOptions); mainBlock.LinkTo(bcBlock, dlOptions); mainBlock.Dump("Main Block"); bcBlock.Dump("Broadcast Block"); bufferBlock1.Dump("Buffer Block 1"); bufferBlock2.Dump("Buffer Block 2"); actionBlock1.Dump("Action Block 1"); actionBlock2.Dump("Action Block 2"); foreach(var i in Enumerable.Range(1, total)) await mainBlock.SendAsync(i, cts.Token); mainBlock.Complete(); await Task.WhenAll(actionBlock1.Completion, actionBlock2.Completion); counter1.Dump("Counter 1"); counter2.Dump("Counter 2"); 

我对此代码有两个问题:

  1. 虽然我将所有适当块的BoundedCapacity限制为10个元素,但似乎我几乎可以同时推送所有1,000,000条消息。 这是预期的行为吗?
  2. 虽然整个网络配置为传播完成,但似乎所有块在调用mainBlock.Complete()后几乎立即完成。 我希望counter1counter2变量都等于total 。 有没有办法实现这种行为?

是的,这是预期的行为, 因为BroadcastBlock

提供一个缓冲区,用于存储最多一个元素,在下一个元素到达时覆盖每个消息。

这意味着如果您将BroadcastBlock链接到具有BoundedCapacity块,您将丢失消息。

要解决这个问题,您可以创建一个行为类似于BroadcastBlock的自定义块,但保证向所有目标传递。 但这样做并非易事,所以你可能会对一个更简单的变体感到满意(最初来自我的老答案 ):

 public static ITargetBlock CreateGuaranteedBroadcastBlock( IEnumerable> targets, DataflowBlockOptions options) { var targetsList = targets.ToList(); var block = new ActionBlock( async item => { foreach (var target in targetsList) { await target.SendAsync(item); } }, new ExecutionDataflowBlockOptions { BoundedCapacity = options.BoundedCapacity, CancellationToken = options.CancellationToken }); block.Completion.ContinueWith(task => { foreach (var target in targetsList) { if (task.Exception != null) target.Fault(task.Exception); else target.Complete(); } }); return block; } 

在您的情况下使用将是:

 var bcBlock = CreateGuaranteedBroadcastBlock( new[] { bufferBlock1, bufferBlock2 }, dbOptions);