如何表示我的数据流完成?

我有一个类实现了一个使用TPL Dataflow由3个步骤组成的数据流。

在构造函数中,我将步骤创建为TransformBlocks并使用LinkTo将其链接起来,并将DataflowLinkOptions.PropagateCompletion设置为true。 该类公开了一个方法,通过在第一步调用SendAsync来启动工作流。 该方法返回工作流程最后一步的“完成”属性。

目前,工作流中的步骤似乎按预期执行,但最终步骤永远不会完成,除非我明确地在其上调用Complete。 但这样做会使工作流程短路并且没有执行任何步骤? 我究竟做错了什么?

public class MessagePipeline { private TransformBlock step1; private TransformBlock step2; private TransformBlock step3; public MessagePipeline() { var linkOptions = new DataflowLinkOptions { PropagateCompletion = true }; step1 = new TransformBlock( x => { Console.WriteLine("Step1..."); return x; }); step2 = new TransformBlock( x => { Console.WriteLine("Step2..."); return x; }); step3 = new TransformBlock( x => { Console.WriteLine("Step3..."); return x; }); step1.LinkTo(step2, linkOptions); step2.LinkTo(step3, linkOptions); } public Task Push(object message) { step1.SendAsync(message); step1.Complete(); return step3.Completion; } } ... public class Program { public static void Main(string[] args) { var pipeline = new MessagePipeline(); var result = pipeline.Push("Hello, world!"); result.ContinueWith(_ => Console.WriteLine("Completed")); Console.ReadLine(); } } 

链接这些步骤时,需要传递DataflowLinkOptions,并将PropagateCompletion属性设置为true以传播完成和错误。 一旦你这样做,在第一个块上调用Complete()将完成下游块的完成。

一旦块接收到完成事件,它就完成处理,然后通知其链接的下游目标。

这样,您可以将所有数据发布到第一步并调用Complete() 。 最后一个块仅在所有上游块完成后才能完成。

例如,

 var linkOptions=new DataflowLinkOptions { PropagateCompletion = true}; myFirstBlock.LinkTo(mySecondBlock,linkOptions); mySecondBlock.LinkTo(myFinalBlock,linkOptions); foreach(var message in messages) { myFirstBlock.Post(message); } myFirstBlock.Complete(); ...... await myFinalBlock.Completion; 

默认情况下,PropagateCompletion不成立,因为在更复杂的场景中(例如非线性流或动态更改流),您不希望完成和错误自动传播。 如果要在不终止整个流程的情况下处理错误,您可能还希望避免自动完成。

当TPL Dataflow处于测试阶段时,默认情况属实,但在RTM上已经改变了

UPDATE

代码永远不会完成,因为最后一步是TransformBlock ,没有链接目标来接收其输出。 这意味着即使块接收到完成信号,它也没有完成所有工作,也无法更改自己的完成状态。

将其更改为ActionBlock消除此问题。

您需要明确调用Complete。