TPL Dataflow仅在完成所有源数据块时保证完成
两个完成转换后,如何重新编写代码完成的代码? 我认为完成意味着它被标记为完成并且“出队列”是空的?
public Test() { broadCastBlock = new BroadcastBlock(i => { return i; }); transformBlock1 = new TransformBlock(i => { Console.WriteLine("1 input count: " + transformBlock1.InputCount); Thread.Sleep(50); return ("1_" + i); }); transformBlock2 = new TransformBlock(i => { Console.WriteLine("2 input count: " + transformBlock1.InputCount); Thread.Sleep(20); return ("2_" + i); }); processorBlock = new ActionBlock(i => { Console.WriteLine(i); }); //Linking broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true }); broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true }); transformBlock1.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true }); transformBlock2.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true }); } public void Start() { const int numElements = 100; for (int i = 1; i <= numElements; i++) { broadCastBlock.SendAsync(i); } //mark completion broadCastBlock.Complete(); processorBlock.Completion.Wait(); Console.WriteLine("Finished"); Console.ReadLine(); } }
我编辑了代码,为每个变换块添加了输入缓冲区计数。 显然,所有100个项目都流式传输到每个变换块。 但是只要其中一个transformblock完成,处理器块就不再接受任何项目,而不完整的transformblock的输入缓冲区只是刷新输入缓冲区。
问题正是casperOne在他的回答中所说的。 第一个转换块完成后,处理器块进入“完成模式”:它将处理其输入队列中的剩余项目,但不会接受任何新项目。
除了将处理器块分成两部分之外,还有一个更简单的解决方法:不要设置PropagateCompletion
,而是在两个转换块完成时手动设置处理器块的完成:
Task.WhenAll(transformBlock1.Completion, transformBlock2.Completion) .ContinueWith(_ => processorBlock.Complete());
这里的问题是,每次调用LinkTo
方法链接块和转换块中的等待时间不同时,都要设置PropagateCompletion
属性 。
从IDataflowBlock
接口上的Complete
方法的文档(强调我的):
向IDataflowBlock发出信号,表示它不应该接受也不会产生任何更多的消息, 也不会消耗更多的推迟消息 。
因为您在每个TransformBlock
实例中错开了等待时间,所以transformBlock2
(等待20 ms)在transformBlock1
(等待50 ms)之前完成。 transformBlock2
首先完成,然后将信号发送到processorBlock
,然后说“我不接受任何其他”(并且transformBlock1
还没有生成它的所有消息)。
注意, transformBlock1
之前的transformBlock1
处理并不是绝对保证的; 线程池(假设您正在使用默认调度程序)将以不同的顺序处理任务是可行的(但很可能不会,因为一旦完成20 ms项目,它将从队列中窃取工作)。
您的管道如下所示:
broadcastBlock / \ transformBlock1 transformBlock2 \ / processorBlock
为了解决这个问题,您希望拥有一个如下所示的管道:
broadcastBlock / \ transformBlock1 transformBlock2 | | processorBlock1 processorBlock2
这是通过创建两个单独的ActionBlock
实例来完成的,如下所示:
// The action, can be a method, makes it easier to share. Action a = i => Console.WriteLine(i); // Create the processor blocks. processorBlock1 = new ActionBlock (a); processorBlock2 = new ActionBlock (a); // Linking broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true }); broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true }); transformBlock1.LinkTo(processorBlock1, new DataflowLinkOptions { PropagateCompletion = true }); transformBlock2.LinkTo(processorBlock2, new DataflowLinkOptions { PropagateCompletion = true });
然后,您需要等待两个处理器块而不是一个:
Task.WhenAll(processorBlock1.Completion, processorBlock2.Completion).Wait();
这是一个非常重要的说明; 在创建ActionBlock
,默认情况下将传递给它的ExecutionDataflowBlockOptions
实例上的MaxDegreeOfParallelism
属性设置为1。
这意味着对传递给ActionBlock
的Action
委托的调用是线程安全的,一次只执行一个。
因为您现在有两个指向同一个Action
委托的ActionBlock
实例,所以不能保证线程安全。
如果您的方法是线程安全的,那么您不必执行任何操作(这将允许您将MaxDegreeOfParallelism
属性设置为DataflowBlockOptions.Unbounded
,因为没有理由阻止)。
如果它不是线程安全的,并且您需要保证它,则需要求助于传统的同步原语,例如lock
语句 。
在这种情况下,你会这样做(尽管显然不需要,因为Console
类上的WriteLine
方法是线程安全的):
// The lock. var l = new object(); // The action, can be a method, makes it easier to share. Action a = i => { // Ensure one call at a time. lock (l) Console.WriteLine(i); }; // And so on...
svick的答案的补充:为了与使用PropagateCompletion选项获得的行为一致,您还需要在前一个块出现故障的情况下转发exception。 像下面这样的扩展方法也会解决这个问题:
public static void CompleteWhenAll(this IDataflowBlock target, params IDataflowBlock[] sources) { if (target == null) return; if (sources.Length == 0) { target.Complete(); return; } Task.Factory.ContinueWhenAll( sources.Select(b => b.Completion).ToArray(), tasks => { var exceptions = (from t in tasks where t.IsFaulted select t.Exception).ToList(); if (exceptions.Count != 0) { target.Fault(new AggregateException(exceptions)); } else { target.Complete(); } } ); }
其他答案很清楚为什么当一个块有两个以上的源时,PropagateCompletion =真正搞乱了。
要提供一个简单的问题解决方案,您可能需要查看一个开源库DataflowEx ,它可以通过内置的更智能的完成规则解决此类问题。 (它在内部使用TPL Dataflow链接,但支持复杂的完成传播。该实现看起来与WhenAll类似,但也处理动态链接添加。请检查Dataflow.RegisterDependency()和TaskEx.AwaitableWhenAll()以获取impl详细信息。)
我稍微改变了你的代码,使一切都可以使用DataflowEx:
public CompletionDemo1() { broadCaster = new BroadcastBlock( i => { return i; }).ToDataflow(); transformBlock1 = new TransformBlock( i => { Console.WriteLine("1 input count: " + transformBlock1.InputCount); Thread.Sleep(50); return ("1_" + i); }); transformBlock2 = new TransformBlock( i => { Console.WriteLine("2 input count: " + transformBlock2.InputCount); Thread.Sleep(20); return ("2_" + i); }); processor = new ActionBlock( i => { Console.WriteLine(i); }).ToDataflow(); /** rather than TPL linking broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true }); broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true }); transformBlock1.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true }); transformBlock2.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true }); **/ //Use DataflowEx linking var transform1 = transformBlock1.ToDataflow(); var transform2 = transformBlock2.ToDataflow(); broadCaster.LinkTo(transform1); broadCaster.LinkTo(transform2); transform1.LinkTo(processor); transform2.LinkTo(processor); }
完整的代码在这里 。
免责声明:我是DataflowEx的作者,它是在MIT许可下发布的。