在数据流网络中使用BufferBlock 的好处

我想知道使用链接到一个或多个ActionBlocks的BufferBlock是否有好处,除了限制(使用BoundedCapacity),而不是直接发布到ActionBlock(只要不需要限制)。

如果您只想将项目从一个块转发到其他几个块,则不需要BufferBlock

但肯定会有一些有用的案例。 例如,如果您有一个复杂的数据流网络,您可能希望从较小的子网络构建它,每个子网络都使用自己的方法创建。 要做到这一点,你需要一些方法来表示一组块。 在您提到的情况下,从方法返回单个BufferBlock (可能是ITargetBlock )将是一个简单的解决方案。

BufferBlock有用的另一个例子是,如果要将项目从多个源块发送到多个目标块。 如果您使用BufferBlock作为中介, BufferBlock每个源块连接到每个目标块。

我确信还有很多其他的例子可以使用BufferBlock 。 当然,如果你没有看到任何理由在你的情况下使用它,那么不要。

要添加到svick的答案,缓冲区块还有另一个好处。 如果您有一个具有多个输出链接的块并希望在它们之间取得平衡,则必须将输出块转为非贪婪并添加缓冲区块来处理排队。 我发现以下示例很有用:

引用现在已经死亡的链接:

这是我们计划做的事情:

  • 一些代码块将使用Post(T t)方法将数据发布到BufferBlock。
  • 此BufferBlock使用BufferBlock的LinkTo t)方法链接到3个ActionBlock实例。

注意,BufferBlock不会将输入数据的副本切换到它链接到的所有目标块。而是仅对一个目标块执行此操作。这是我们期望当一个目标忙于处理请求时。它将被传递到了另一个目标。现在让我们参考下面的代码:

 static void Main(string[] args) { BufferBlock bb = new BufferBlock(); ActionBlock a1 = new ActionBlock((a) => { Thread.Sleep(100); Console.WriteLine("Action A1 executing with value {0}", a); } ); ActionBlock a2 = new ActionBlock((a) => { Thread.Sleep(50); Console.WriteLine("Action A2 executing with value {0}", a); } ); ActionBlock a3 = new ActionBlock((a) => { Thread.Sleep(50); Console.WriteLine("Action A3 executing with value {0}", a); } ); bb.LinkTo(a1); bb.LinkTo(a2); bb.LinkTo(a3); Task t = new Task(() => { int i = 0; while (i < 10) { Thread.Sleep(50); i++; bb.Post(i); } } ); t.Start(); Console.Read(); } 

执行时会产生以下输出:

  • 动作A1以值1执行
  • 动作A1以值2执行
  • 动作A1以值3执行
  • 动作A1以值4执行
  • 动作A1以值5执行
  • 动作A1以值6执行
  • 动作A1以值7执行
  • 动作A1以值8执行
  • 动作A1以值9执行
  • 动作A1以值10执行

这表明只有一个目标实际上正在执行所有数据,即使它很忙(由于Thread.Sleep(100)有意添加)。为什么?

这是因为所有目标块在本质上都是贪婪的,并且即使在它们无法处理数据时也会缓冲输入。为了改变这种行为,我们在DataFlowBlockOptions中将Greedy属性设置为false,同时初始化ActionBlock,如下所示。

 static void Main(string[] args) { BufferBlock bb = new BufferBlock(); ActionBlock a1 = new ActionBlock((a) => { Thread.Sleep(100); Console.WriteLine("Action A1 executing with value {0}", a); } , new DataflowBlockOptions(taskScheduler: TaskScheduler.Default, maxDegreeOfParallelism: 1, maxMessagesPerTask: 1, cancellationToken: CancellationToken.None, //Not Greedy greedy: false) ); ActionBlock a2 = new ActionBlock((a) => { Thread.Sleep(50); Console.WriteLine("Action A2 executing with value {0}", a); } , new DataflowBlockOptions(taskScheduler: TaskScheduler.Default, maxDegreeOfParallelism: 1, maxMessagesPerTask: -1, cancellationToken: CancellationToken.None, greedy: false) ); ActionBlock a3 = new ActionBlock((a) => { Thread.Sleep(50); Console.WriteLine("Action A3 executing with value {0}", a); } , new DataflowBlockOptions(taskScheduler: TaskScheduler.Default, maxDegreeOfParallelism: 1, maxMessagesPerTask: -1, cancellationToken: CancellationToken.None, greedy: false) ); bb.LinkTo(a1); bb.LinkTo(a2); bb.LinkTo(a3); Task t = new Task(() => { int i = 0; while (i < 10) { Thread.Sleep(50); i++; bb.Post(i); } } ); t.Start(); Console.Read(); } 

该程序的输出是:

  • 动作A1以值1执行
  • 动作A2以值3执行
  • 动作A1以值2执行
  • 动作A3以值6执行
  • 动作A3以值7执行
  • 动作A3以值8执行
  • 动作A2以值5执行
  • 动作A3以值9执行
  • 动作A1以值4执行
  • 动作A2以值10执行

这显然是按预期在三个ActionBlock中分布数据。

不,第二个例子不会因为多种原因而编译:只能为“分组”数据流块设置greedy = false – 不能为执行块设置; 然后它必须通过GroupingDataflowBlockOptions设置 – 而不是DataflowBlockOptions; 然后将其设置为属性值“{Greedy = false}”而不是构造函数参数。

如果要限制动作块的容量,可以通过设置DataflowBlockOptions的BoundedCapacity属性的值来实现(尽管如OP所述,他们已经知道了这个选项)。 像这样:

 var a1 = new ActionBlock( i => doSomeWork(i), new ExecutionDataflowBlockOptions {BoundedCapacity = 1} );