为什么块按此顺序运行?

这是一个简短的代码示例,可以快速向您介绍我的问题:

using System; using System.Linq; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; namespace DataflowTest { class Program { static void Main(string[] args) { var firstBlock = new TransformBlock(x => x, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 }); var secondBlock = new TransformBlock(async x => { if (x == 12) { await Task.Delay(5000); return $"{DateTime.Now}: Message is {x} (This is delayed message!) "; } return $"{DateTime.Now}: Message is {x}"; }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 }); var thirdBlock = new ActionBlock(s => Console.WriteLine(s), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 }); firstBlock.LinkTo(secondBlock); secondBlock.LinkTo(thirdBlock); var populateTask = Task.Run(async () => { foreach (var x in Enumerable.Range(1, 15)) { await firstBlock.SendAsync(x); } }); populateTask.Wait(); secondBlock.Completion.Wait(); } } } 

输出是:

 09.08.2016 15:03:08: Message is 1 09.08.2016 15:03:08: Message is 5 09.08.2016 15:03:08: Message is 6 09.08.2016 15:03:08: Message is 7 09.08.2016 15:03:08: Message is 8 09.08.2016 15:03:08: Message is 9 09.08.2016 15:03:08: Message is 10 09.08.2016 15:03:08: Message is 11 09.08.2016 15:03:08: Message is 3 09.08.2016 15:03:08: Message is 2 09.08.2016 15:03:08: Message is 4 09.08.2016 15:03:13: Message is 12 (This is delayed message!) 09.08.2016 15:03:08: Message is 15 09.08.2016 15:03:08: Message is 13 09.08.2016 15:03:08: Message is 14 

为什么这个订单以及如何更改网络以获得下面的输出?

 09.08.2016 15:03:08: Message is 1 09.08.2016 15:03:08: Message is 5 09.08.2016 15:03:08: Message is 6 09.08.2016 15:03:08: Message is 7 09.08.2016 15:03:08: Message is 8 09.08.2016 15:03:08: Message is 9 09.08.2016 15:03:08: Message is 10 09.08.2016 15:03:08: Message is 11 09.08.2016 15:03:08: Message is 3 09.08.2016 15:03:08: Message is 2 09.08.2016 15:03:08: Message is 4 09.08.2016 15:03:08: Message is 15 09.08.2016 15:03:08: Message is 13 09.08.2016 15:03:08: Message is 14 09.08.2016 15:03:13: Message is 12 (This is delayed message!) 

所以我想知道为什么所有其他块(或这里的任务)都等待延迟块?


UPDATE

既然你们让我更详细地解释我的问题,我就把这个样本做得更接近我正在研究的真正的管道。 假设应用程序下载一些数据并根据返回的响应计算哈希值。

 using System; using System.Diagnostics; using System.Linq; using System.Net.Http; using System.Security.Cryptography; using System.Text; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; namespace DataflowTest { class Program { static void Main(string[] args) { var firstBlock = new TransformBlock(x => x.ToString(), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 }); var secondBlock = new TransformBlock<string, Tuple>(async x => { using (var httpClient = new HttpClient()) { if (x == "4") await Task.Delay(5000); var result = await httpClient.GetStringAsync($"http://scooterlabs.com/echo/{x}"); return new Tuple(x, result); } }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 }); var thirdBlock = new TransformBlock<Tuple, Tuple>(x => { using (var algorithm = SHA256.Create()) { var bytes = Encoding.UTF8.GetBytes(x.Item2); var hash = algorithm.ComputeHash(bytes); return new Tuple(x.Item1, hash); } }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 }); var fourthBlock = new ActionBlock<Tuple>(x => { var output = $"{DateTime.Now}: Hash for element #{x.Item1}: {GetHashAsString(x.Item2)}"; Console.WriteLine(output); }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 }); firstBlock.LinkTo(secondBlock); secondBlock.LinkTo(thirdBlock); thirdBlock.LinkTo(fourthBlock); var populateTasks = Enumerable.Range(1, 10).Select(x => firstBlock.SendAsync(x)); Task.WhenAll(populateTasks).ContinueWith(x => firstBlock.Complete()).Wait(); fourthBlock.Completion.Wait(); } private static string GetHashAsString(byte[] bytes) { var sb = new StringBuilder(); int i; for (i = 0; i < bytes.Length; i++) { sb.AppendFormat("{0:X2}", bytes[i]); if (i % 4 == 3) sb.Append(" "); } return sb.ToString(); } } } 

我们来看看请求的顺序:

要求

这绝对有道理。 所有请求都是尽快提出的。 缓慢的第四个请求在列表的末尾。

现在让我们看看我们有什么输出:

 09.08.2016 20:44:53: Hash for element #3: 4D0AB933 EE521204 CA784F3E 248EC698 F9E4D5F3 8F23A78F 3A00E069 29E73E32 09.08.2016 20:44:53: Hash for element #2: 4D0AB933 EE521204 CA784F3E 248EC698 F9E4D5F3 8F23A78F 3A00E069 29E73E32 09.08.2016 20:44:53: Hash for element #1: 4D0AB933 EE521204 CA784F3E 248EC698 F9E4D5F3 8F23A78F 3A00E069 29E73E32 09.08.2016 20:44:58: Hash for element #6: FC86E4F8 A83036BA 365BC7EE F9371778 59A11186 ED12A43C 3885D686 5004E6B3 09.08.2016 20:44:58: Hash for element #8: FC86E4F8 A83036BA 365BC7EE F9371778 59A11186 ED12A43C 3885D686 5004E6B3 09.08.2016 20:44:58: Hash for element #9: FC86E4F8 A83036BA 365BC7EE F9371778 59A11186 ED12A43C 3885D686 5004E6B3 09.08.2016 20:44:58: Hash for element #10: FC86E4F8 A83036BA 365BC7EE F9371778 59A11186 ED12A43C 3885D686 5004E6B3 09.08.2016 20:44:58: Hash for element #4: 44A63CBF 8E27D0DD AFE5A761 AADA4E49 AA52FE8E E3D7DC82 AFEAAF1D 72A9BC7F 09.08.2016 20:44:58: Hash for element #5: FC86E4F8 A83036BA 365BC7EE F9371778 59A11186 ED12A43C 3885D686 5004E6B3 09.08.2016 20:44:58: Hash for element #7: FC86E4F8 A83036BA 365BC7EE F9371778 59A11186 ED12A43C 3885D686 5004E6B3 

您可以看到第三个响应到来之后立即计算出第三个之后的所有哈希值。

因此,基于这两个事实,我们可以说所有下载的页面都在等待缓慢的第四个请求。 最好不要在下载数据后立即等待第四个请求和计算哈希值。 有什么办法可以实现吗?

好的,通过@SirRufo的引用,我开始考虑实现我自己的TransformBlock ,它可以满足我的需求并处理传入的项目,而不需要订购。 这样它就不会破坏网络,在部分下载中建立块之间的差距,并且将是优雅的方式。

所以我开始研究我该怎么做以及如何做到这一点。 要查看TransformBlock源代码本身似乎是一个很好的起点,所以我在Github上打开了TransformBlock源并开始分析它。

从课程开始,我发现了这个有趣的事情://如果采用并行性,我们将需要支持无序完成的重新排序消息。

 // However, a developer can override this with EnsureOrdered == false. if (dataflowBlockOptions.SupportsParallelExecution && dataflowBlockOptions.EnsureOrdered) { _reorderingBuffer = new ReorderingBuffer(this, (owningSource, message) => ((TransformBlock)owningSource)._source.AddMessage(message)); } 

看起来像我们想要的确切事情! 让我们在Github上的DataflowBlockOptions类中看到这个EnsureOrdered选项:

 /// Gets or sets whether ordered processing should be enforced on a block's handling of messages. ///  /// By default, dataflow blocks enforce ordering on the processing of messages. This means that a /// block like  will ensure that messages are output in the same /// order they were input, even if parallelism is employed by the block and the processing of a message N finishes /// after the processing of a subsequent message N+1 (the block will reorder the results to maintain the input /// ordering prior to making those results available to a consumer). Some blocks may allow this to be relaxed, /// however. Setting  to false tells a block that it may relax this ordering if /// it's able to do so. This can be beneficial if the immediacy of a processed result being made available /// is more important than the input-to-output ordering being maintained. ///  public bool EnsureOrdered { get { return _ensureOrdered; } set { _ensureOrdered = value; } } 

它看起来非常好,所以我立即切换到IDE来设置它。 不幸的是,没有这样的设置:

没有EnsureOrdered

我一直在寻找并发现这个说明 :

4.5.25-β-23019

包已重命名为System.Threading.Tasks.Dataflow

当我用Google搜索并发现这个包时 ,称为System.Threading.Tasks.Dataflow ! 所以我卸载了Microsoft.Tpl.Dataflow包并通过发出以下命令安装了System.Threading.Tasks.Dataflow

 Install-Package System.Threading.Tasks.Dataflow 

还有EnsureOrdered选项。 我将EnsureOrdered设置为false更新了代码:

 using System; using System.Diagnostics; using System.Linq; using System.Net.Http; using System.Security.Cryptography; using System.Text; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; namespace DataflowTest { class Program { static void Main(string[] args) { var options = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4, EnsureOrdered = false }; var firstBlock = new TransformBlock(x => x.ToString(), options); var secondBlock = new TransformBlock>(async x => { using (var httpClient = new HttpClient()) { if (x == "4") await Task.Delay(5000); var result = await httpClient.GetStringAsync($"http://scooterlabs.com/echo/{x}"); return new Tuple(x, result); } }, options); var thirdBlock = new TransformBlock, Tuple>(x => { using (var algorithm = SHA256.Create()) { var bytes = Encoding.UTF8.GetBytes(x.Item2); var hash = algorithm.ComputeHash(bytes); return new Tuple(x.Item1, hash); } }, options); var fourthBlock = new ActionBlock>(x => { var output = $"{DateTime.Now}: Hash for element #{x.Item1}: {GetHashAsString(x.Item2)}"; Console.WriteLine(output); }, options); firstBlock.LinkTo(secondBlock); secondBlock.LinkTo(thirdBlock); thirdBlock.LinkTo(fourthBlock); var populateTasks = Enumerable.Range(1, 10).Select(x => firstBlock.SendAsync(x)); Task.WhenAll(populateTasks).ContinueWith(x => firstBlock.Complete()).Wait(); fourthBlock.Completion.Wait(); } private static string GetHashAsString(byte[] bytes) { var sb = new StringBuilder(); int i; for (i = 0; i < bytes.Length; i++) { sb.AppendFormat("{0:X2}", bytes[i]); if (i % 4 == 3) sb.Append(" "); } return sb.ToString(); } } } 

结果输出正是我想要的:

 10.08.2016 11:03:23: Hash for element #3: 8BA8A86D F25E058E 180F7AA9 1EE996B0 8D721C84 AEE8AA19 0A3F7C44 9FFED481 10.08.2016 11:03:23: Hash for element #1: 8BA8A86D F25E058E 180F7AA9 1EE996B0 8D721C84 AEE8AA19 0A3F7C44 9FFED481 10.08.2016 11:03:23: Hash for element #2: 8BA8A86D F25E058E 180F7AA9 1EE996B0 8D721C84 AEE8AA19 0A3F7C44 9FFED481 10.08.2016 11:03:23: Hash for element #10: C8C87B26 B17A6329 3F6213CD 4F9AFE0D 4F90127B AAE49EB0 7D8C15DF 74F020B1 10.08.2016 11:03:23: Hash for element #8: C8C87B26 B17A6329 3F6213CD 4F9AFE0D 4F90127B AAE49EB0 7D8C15DF 74F020B1 10.08.2016 11:03:23: Hash for element #9: C8C87B26 B17A6329 3F6213CD 4F9AFE0D 4F90127B AAE49EB0 7D8C15DF 74F020B1 10.08.2016 11:03:23: Hash for element #5: C8C87B26 B17A6329 3F6213CD 4F9AFE0D 4F90127B AAE49EB0 7D8C15DF 74F020B1 10.08.2016 11:03:23: Hash for element #7: C8C87B26 B17A6329 3F6213CD 4F9AFE0D 4F90127B AAE49EB0 7D8C15DF 74F020B1 10.08.2016 11:03:23: Hash for element #6: C8C87B26 B17A6329 3F6213CD 4F9AFE0D 4F90127B AAE49EB0 7D8C15DF 74F020B1 10.08.2016 11:03:27: Hash for element #4: FD25E52B FCD8DE81 BD38E11B 13C20B96 09473283 F25346B2 04593B70 E4357BDA 

这是设计和记录

因为每个预定义的源数据流块类型都保证消息按接收顺序传播出去,…

certificate:

 var ts = Environment.TickCount; var firstBlock = new TransformBlock( x => x, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4, } ); var secondBlock = new TransformBlock( x => { var start = Environment.TickCount; if ( x == 3 ) { Thread.Sleep( 5000 ); return $"Start {start-ts} Finished {Environment.TickCount - ts}: Message is {x} (This is delayed message!) "; } return $"Start {start - ts} Finished {Environment.TickCount - ts}: Message is {x}"; }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4, // limit the internal queue to 10 items BoundedCapacity = 10, } ); var thirdBlock = new ActionBlock( s => { Console.WriteLine( s ); }, new ExecutionDataflowBlockOptions { // limit to a single task to watch the order MaxDegreeOfParallelism = 1, } ); firstBlock.LinkTo( secondBlock, new DataflowLinkOptions { PropagateCompletion = true, } ); secondBlock.LinkTo( thirdBlock, new DataflowLinkOptions { PropagateCompletion = true, } ); foreach ( var x in Enumerable.Range( 1, 15 ) ) { // to ensure order of items firstBlock.Post( x ); } firstBlock.Complete(); thirdBlock.Completion.Wait(); 

输出:

开始31完成31:消息为1
开始31完成31:消息是2
开始31完成5031:消息是3(这是延迟消息!)
开始31完成31:消息是4
开始31完成31:消息是5
开始31完成31:消息是6
开始31完成31:消息是7
开始31完成31:消息是8
开始31完成31:消息是9
开始31完成31:消息是10
开始31完成31:消息是11
开始31完成31:消息是12
开始5031完成5031:消息是13
开始5031完成5031:消息是14
开始5031完成5031:消息是15

解决方案1

不要将DataFlow用于下载部分,因为订单保证会阻止您正在寻找的处理。

 var ts = Environment.TickCount; var thirdBlock = new ActionBlock( s => { Console.WriteLine( s ); }, new ExecutionDataflowBlockOptions { // limit to a single task to watch the order MaxDegreeOfParallelism = 4, } ); Parallel.ForEach( Enumerable.Range( 1, 15 ), new ParallelOptions { MaxDegreeOfParallelism = 4, }, x => { var start = Environment.TickCount; string result; if ( x == 12 ) { Thread.Sleep( 5000 ); result = $"Start {start - ts} Finished {Environment.TickCount - ts}: Message is {x} (This is delayed message!) "; } else result = $"Start {start - ts} Finished {Environment.TickCount - ts}: Message is {x}"; thirdBlock.Post( result ); } ); thirdBlock.Complete(); thirdBlock.Completion.Wait(); 

输出:

开始32完成32:消息是2
开始32完成32:消息是6
开始32完成32:消息是5
开始32完成32:消息是8
开始32完成32:消息是9
开始32完成32:消息是10
开始32完成32:消息是11
开始32完成32:消息是7
开始32完成32:消息是13
开始32完成32:消息是14
开始32完成32:消息是15
开始32完成32:消息是3
开始32完成32:消息是4
开始32完成32:消息为1
开始32完成5032:消息是12(这是延迟消息!)

解决方案2

当然,您可以在保证项目顺序的自定义类中实现IPropagatorBlock

查看时间戳,第二个块的输出正如您所期望的那样工作 – 延迟的TransformBlock正在所有其他TransformBlock之后运行。 它似乎是ActionBlock中的Console.WriteLine,它没有按照您期望的顺序调用。

你的代码是secondBlock.Completion.Wait(); 不正确 – 应该是thirdBlock.Completion.Wait(); 为了得到你期望的结果?