如何在TPL数据流中安排流量控制?
我试图控制TPL Dataflow中的数据流。 我有一个非常快的制作人和一个非常慢的消费者。 (我的真实代码更复杂,但是,这是一个非常好的模型,它重现了问题。)
当我运行它时,代码开始饮用内存,就像它的样式一样 – 并且生产者的输出队列尽可能快地填满。 我真正希望看到的是制作人停止运行一段时间,直到消费者有机会要求它。 根据我对文档的阅读,这是应该发生的事情:也就是说,我认为生产者等待消费者有空间。
显然情况并非如此。 如何修复它以便队列不会发疯?
using System; using System.Linq; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; using System.Threading; namespace MemoryLeakTestCase { class Program { static void Main(string[] args) { var CreateData = new TransformManyBlock(ignore => { return Enumerable.Range(0, 1000 * 1000 * 1000).Select((s,i) => "Hello, World " + i); }); var ParseFile = new TransformManyBlock(fileContent => { Thread.Sleep(1000); return Enumerable.Range(0, 100).Select((sst, iii) => "Hello, " + iii); }, new ExecutionDataflowBlockOptions() { BoundedCapacity = 1000 } ); var EndOfTheLine = new ActionBlock(f => { }); var linkOptions = new DataflowLinkOptions { PropagateCompletion = true, }; CreateData.LinkTo(ParseFile, linkOptions); ParseFile.LinkTo(EndOfTheLine, linkOptions); Task t = new Task(() => { while (true) { Console.WriteLine("CreateData: " + Report(CreateData)); Console.WriteLine("ParseData: " + Report(ParseFile)); Console.WriteLine("NullTarget: " + EndOfTheLine.InputCount ); Thread.Sleep(1000); } }); t.Start(); CreateData.SendAsync(0); CreateData.Complete(); EndOfTheLine.Completion.Wait(); } public static string Report(TransformManyBlock block) { return String.Format("INPUT: {0} OUTPUT: {1} ", block.InputCount.ToString().PadLeft(10, ' '), block.OutputCount.ToString().PadLeft(10, ' ')); } } }
通常,在这种情况下你要做的是设置CreateData
块的BoundedCapacity
。 但是这在这里不起作用,因为当从单个IEnumerable
填充输出队列时, TransformManyBlock
似乎没有考虑BoundedCapacity
。
你可以做的是创建一个迭代集合的函数,并使用SendAsync()
仅在目标可以接受时发送更多数据:
/// /// If iterating data throws an exception, the target block is faulted /// and the returned Task completes successfully. /// /// Depending on the usage, this might or might not be what you want. /// public static async Task SendAllAsync( this ITargetBlock target, IEnumerable data) { try { foreach (var item in data) { await target.SendAsync(item); } } catch (Exception e) { target.Fault(e); } }
用法:
var data = Enumerable.Range(0, 1000 * 1000 * 1000).Select((s,i) => "Hello, World " + i); await ParseFile.SendAllAsync(data); ParseFile.Complete();
如果你仍然希望CreateData
块的行为类似于原始代码,那么你可以在它们之间有两个有界的BufferBlock
, SendAllAsync()
,然后使用Encapsulate()
使它们看起来像一个块:
/// /// boundedCapacity represents the capacity of the input queue /// and the output queue separately, not their total. /// public static IPropagatorBlock CreateBoundedTransformManyBlock( Func> transform, int boundedCapacity) { var input = new BufferBlock( new DataflowBlockOptions { BoundedCapacity = boundedCapacity }); var output = new BufferBlock( new DataflowBlockOptions { BoundedCapacity = boundedCapacity }); Task.Run( async () => { try { while (await input.OutputAvailableAsync()) { var data = transform(await input.ReceiveAsync()); await output.SendAllAsync(data); } output.Complete(); } catch (Exception e) { ((IDataflowBlock)input).Fault(e); ((IDataflowBlock)output).Fault(e); } }); return DataflowBlock.Encapsulate(input, output); }