如何在TPL数据流中安排流量控制?

我试图控制TPL Dataflow中的数据流。 我有一个非常快的制作人和一个非常慢的消费者。 (我的真实代码更复杂,但是,这是一个非常好的模型,它重现了问题。)

当我运行它时,代码开始饮用内存,就像它的样式一样 – 并且生产者的输出队列尽可能快地填满。 我真正希望看到的是制作人停止运行一段时间,直到消费者有机会要求它。 根据我对文档的阅读,这是应该发生的事情:也就是说,我认为生产者等待消费者有空间。

显然情况并非如此。 如何修复它以便队列不会发疯?

using System; using System.Linq; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; using System.Threading; namespace MemoryLeakTestCase { class Program { static void Main(string[] args) { var CreateData = new TransformManyBlock(ignore => { return Enumerable.Range(0, 1000 * 1000 * 1000).Select((s,i) => "Hello, World " + i); }); var ParseFile = new TransformManyBlock(fileContent => { Thread.Sleep(1000); return Enumerable.Range(0, 100).Select((sst, iii) => "Hello, " + iii); }, new ExecutionDataflowBlockOptions() { BoundedCapacity = 1000 } ); var EndOfTheLine = new ActionBlock(f => { }); var linkOptions = new DataflowLinkOptions { PropagateCompletion = true, }; CreateData.LinkTo(ParseFile, linkOptions); ParseFile.LinkTo(EndOfTheLine, linkOptions); Task t = new Task(() => { while (true) { Console.WriteLine("CreateData: " + Report(CreateData)); Console.WriteLine("ParseData: " + Report(ParseFile)); Console.WriteLine("NullTarget: " + EndOfTheLine.InputCount ); Thread.Sleep(1000); } }); t.Start(); CreateData.SendAsync(0); CreateData.Complete(); EndOfTheLine.Completion.Wait(); } public static string Report(TransformManyBlock block) { return String.Format("INPUT: {0} OUTPUT: {1} ", block.InputCount.ToString().PadLeft(10, ' '), block.OutputCount.ToString().PadLeft(10, ' ')); } } } 

通常,在这种情况下你要做的是设置CreateData块的BoundedCapacity 。 但是这在这里不起作用,因为当从单个IEnumerable填充输出队列时, TransformManyBlock似乎没有考虑BoundedCapacity

你可以做的是创建一个迭代集合的函数,并使用SendAsync()仅在目标可以接受时发送更多数据:

 ///  /// If iterating data throws an exception, the target block is faulted /// and the returned Task completes successfully. /// /// Depending on the usage, this might or might not be what you want. ///  public static async Task SendAllAsync( this ITargetBlock target, IEnumerable data) { try { foreach (var item in data) { await target.SendAsync(item); } } catch (Exception e) { target.Fault(e); } } 

用法:

 var data = Enumerable.Range(0, 1000 * 1000 * 1000).Select((s,i) => "Hello, World " + i); await ParseFile.SendAllAsync(data); ParseFile.Complete(); 

如果你仍然希望CreateData块的行为类似于原始代码,那么你可以在它们之间有两个有界的BufferBlockSendAllAsync() ,然后使用Encapsulate()使它们看起来像一个块:

 ///  /// boundedCapacity represents the capacity of the input queue /// and the output queue separately, not their total. ///  public static IPropagatorBlock CreateBoundedTransformManyBlock( Func> transform, int boundedCapacity) { var input = new BufferBlock( new DataflowBlockOptions { BoundedCapacity = boundedCapacity }); var output = new BufferBlock( new DataflowBlockOptions { BoundedCapacity = boundedCapacity }); Task.Run( async () => { try { while (await input.OutputAvailableAsync()) { var data = transform(await input.ReceiveAsync()); await output.SendAllAsync(data); } output.Complete(); } catch (Exception e) { ((IDataflowBlock)input).Fault(e); ((IDataflowBlock)output).Fault(e); } }); return DataflowBlock.Encapsulate(input, output); }