下游的TPL数据流块如何获取源生成的数据?

我正在使用TPL Dataflow处理图像。 我收到处理请求,从流中读取图像,应用几个转换,然后将生成的图像写入另一个流:

Request -> Stream -> Image -> Image ... -> Stream 

为此,我使用块:

 BufferBlock TransformBlock TransformBlock TransformBlock TransformBlock ... writerBlock = new ActionBlock 

问题是初始Request是包含创建结果Stream所需的一些数据以及此时我需要的一些其他信息。 我是否必须将原始Request (或其他一些上下文对象) writerBlock到所有其他块上的writerBlock ,如下所示:

 TransformBlock<Request,Tuple> TransformBlock<Tuple,Tuple> TransformBlock<Tuple,Tuple> ... 

(这是丑陋的),或者有没有办法将第一个块链接到最后一个块(或者,推广到需要附加数据的块)?

是的,你几乎需要做你所描述的,将每个块中的附加数据传递给下一个块。

但是使用几个辅助方法,你可以使这更简单:

 public static IPropagatorBlock> CreateExtendedSource(Func transform) { return new TransformBlock>( input => Tuple.Create(transform(input), input)); } public static IPropagatorBlock, Tuple> CreateExtendedTransform(Func transform) { return new TransformBlock, Tuple>( tuple => Tuple.Create(transform(tuple.Item1), tuple.Item2)); } 

签名看起来令人生畏,但实际上并没有那么糟糕。

此外,您可能希望添加将选项传递给创建的块的重载,或者重载以获取异步委托的重载。

例如,如果您想使用单独的块对数字执行某些操作,同时沿途传递原始数字,您可以执行以下操作:

 var source = new BufferBlock(); var divided = CreateExtendedSource(i => i / 2.0); var formatted = CreateExtendedTransform(d => d.ToString("0.0")); var writer = new ActionBlock>(tuple => Console.WriteLine(tuple)); source.LinkTo(divided); divided.LinkTo(formatted); formatted.LinkTo(writer); for (int i = 0; i < 10; i++) source.Post(i); 

如您所见,您的lambdas(除了最后一个)只处理“当前”值( intdoublestring ,具体取决于管道的阶段),“原始”值(总是int )自动传递。 在任何时候,您都可以使用使用普通构造函数创建的块来访问这两个值(如示例中的最终ActionBlock )。

(那个BufferBlock实际上并不是必需的,但我添加它以更BufferBlock您的设计。)

因为我刚开始使用TPL Dataflow,所以我可能会过头了。 但我相信你可以使用BroadcastBlock作为源和你的第一个目标之间的媒介来实现这一点。

BroadcastBlock可以将消息提供给许多目标,因此您可以使用它向目标提供,也可以使用JoinBlock ,最后将结果与原始消息合并。

 source -> Broadcast ->-----------------------------------------> JoinBlock  -> Transformation1 -> Transformation 'n' -> 

例如:

 var source = new BufferBlock(); var transformation = new TransformBlock(i => i * 100); var broadCast = new BroadcastBlock(null); source.LinkTo(broadCast); broadCast.LinkTo(transformation); var jb = new JoinBlock(); broadCast.LinkTo(jb.Target1); transformation.LinkTo(jb.Target2); jb.LinkTo(new ActionBlock>( c => Console.WriteLine("Source:{0}, Target Result: {1}", c.Item1, c.Item2))); source.Post(1); source.Post(2); source.Complete(); 

收益率…

资料来源:1,目标结果:100

资料来源:2,目标结果:200

我不太确定它在异步环境中的表现。