TPL数据流加速?

我想知道以下代码是否可以优化以更快地执行。 在一个非常简单的数据流结构中,我目前似乎每秒大约有140万条简单消息。 我知道这个示例进程同步传递/转换消息,但是,我目前正在测试TPL Dataflow作为我自己的基于任务和并发集合的自定义解决方案的替代。 我知道术语“并发”已经建议我并行运行,但是出于当前的测试目的,我通过同步推送消息在我自己的解决方案上,每秒我得到大约510万条消息。 我在这里缺少什么,我读到TPL Dataflow被推为高吞吐量,低延迟的解决方案,但到目前为止我必须忽略性能调整。 有谁能指出我正确的方向吗?

class TPLDataFlowExperiments { public TPLDataFlowExperiments() { var buf1 = new BufferBlock(); var transform = new TransformBlock(t => { return ""; }); var action = new ActionBlock(s => { //Thread.Sleep(100); //Console.WriteLine(s); }); buf1.LinkTo(transform); transform.LinkTo(action); //Propagate all Completions down the flow buf1.Completion.ContinueWith(t => { transform.Complete(); transform.Completion.ContinueWith(u => { action.Complete(); }); }); Stopwatch watch = new Stopwatch(); watch.Start(); int cap = 10000000; for (int i = 0; i  { watch.Stop(); Console.WriteLine("All Blocks finished processing"); Console.WriteLine("Units processed per second: " + cap / watch.ElapsedMilliseconds * 1000); }); Console.ReadLine(); } } 

我认为这主要归结为一件事:你的测试几乎毫无意义。 所有这些块应该做某事,并使用多个内核和异步操作来做到这一点。

此外,在您的测试中,可能会花费大量时间进行同步。 使用更实际的代码,代码将花费一些时间来执行,因此争用会更少,因此实际开销将小于您测量的开销。

但要真正回答你的问题,是的,你忽略了一些性能调整。 具体来说,可以使用SingleProducerConstrained ,这意味着可以使用锁定较少的数据结构。 如果我在两个块上使用它( BufferBlock在这里完全没用,你可以安全地删除它),速率从我的计算机上的每秒约3-4百万件增加到超过500万件。

要添加到svick的答案,测试仅对单个操作块使用单个处理线程。 这样它只测试使用块的开销。

DataFlow的工作方式类似于F#Agents,Scala actors和MPI实现。 每个操作块一次执行一个任务,监听输入并生成输出。 通过以可在多个核上独立执行的步骤中断算法来提供加速,仅将消息传递给彼此。

虽然可以增加并发任务的数量,但最重要的问题是设计一个独立于其他任务执行最大步骤数量的流程。

您还可以增加数据流块的并行度。 这可以提供额外的加速,并且如果您发现其中一个块作为其余块的瓶颈,也可以帮助在线性任务之间进行负载平衡。