TPL完成与完成

我需要从遗留数据库导入客户相关数据,并在此过程中执行多次转换。 这意味着单个条目需要执行其他“事件”(同步产品,创建发​​票等)。

我最初的解决方案是简单的并行方法。 它工作正常,但有时它有问题。 如果当前处理的客户需要等待相同类型的事件,他们的处理队列可能会卡住并最终超时,导致每个基础事件也失败(它们依赖于失败的事件)。 它不会一直发生,但它很烦人。

所以我有了另一个想法,分批工作。 我的意思不仅是限制同时处理的客户数量,还包括广播到队列的事件数量。 在寻找想法时,我找到了这个答案,它指向了TPL DataFlow 。

我制作了一个骨架来熟悉它。 我建立了一个简单的管道,但我对Complete()的使用和等待Completion()有点困惑。

步骤如下

  1. 制作一个数字列表(要导入的客户的ID) – 这是在导入逻辑之外,它只是在那里能够触发其余的逻辑
  2. 创建BatchBlock (以便能够同时限制要处理的客户数量)
  3. 基于id创建单个MyClass1项( TransformBlock
  4. 执行一些逻辑并生成MyClass2的集合( TransformManyBlock ) – 例如,睡眠1秒
  5. 对集合的每个项目执行一些逻辑( ActionBlock ) – 例如,hibernate1秒钟

这是完整的代码:

 public static class Program { private static void Main(string[] args) { var batchBlock = new BatchBlock(2); for (var i = 1; i < 10; i++) { batchBlock.Post(i); } batchBlock.Complete(); while (batchBlock.TryReceive(null, out var ids)) { var transformBlock = new TransformBlock(delegate (int id) { Console.WriteLine($"TransformBlock(id: {id})"); return new MyClass1(id, "Star Wars"); }); var transformManyBlock = new TransformManyBlock(delegate (MyClass1 myClass1) { Console.WriteLine($"TransformManyBlock(myClass1: {myClass1.Id}|{myClass1.Value})"); Thread.Sleep(1000); return GetMyClass22Values(myClass1); }); var actionBlock = new ActionBlock(delegate (MyClass2 myClass2) { Console.WriteLine($"ActionBlock(myClass2: {myClass2.Id}|{myClass2.Value})"); Thread.Sleep(1000); }); transformBlock.LinkTo(transformManyBlock); transformManyBlock.LinkTo(actionBlock); foreach (var id in ids) { transformBlock.Post(id); } // this is the point when I'm not 100% sure //transformBlock.Complete(); //transformManyBlock.Complete(); //transformManyBlock.Completion.Wait(); actionBlock.Complete(); actionBlock.Completion.Wait(); } Console.WriteLine(); Console.WriteLine("Press any key to continue..."); Console.ReadKey(); } private static IEnumerable GetMyClass22Values(MyClass1 myClass1) { return new List { new MyClass2(1, myClass1.Id+ " did this"), new MyClass2(2, myClass1.Id+ " did that"), new MyClass2(3, myClass1.Id+ " did this again") }; } } public class MyClass1 { public MyClass1(int id, string value) { Id = id; Value = value; } public int Id { get; set; } public string Value { get; set; } } public class MyClass2 { public MyClass1(int id, string value) { Id = id; Value = value; } public int Id { get; set; } public string Value { get; set; } } 

所以我挣扎的重点是结束,我需要调用Complete()或等待Completion 。 我似乎找不到合适的组合。 我想看一个输出如下:

 TransformBlock(id: 1) TransformBlock(id: 2) TransformManyBlock(myClass1: 1|Star Wars) TransformManyBlock(myClass1: 2|Star Wars) ActionBlock(myClass2: 1|1 did this) ActionBlock(myClass2: 2|1 did that) ActionBlock(myClass2: 3|1 did this again) ActionBlock(myClass2: 1|2 did this) ActionBlock(myClass2: 2|2 did that) ActionBlock(myClass2: 3|2 did this again) TransformBlock(id: 3) TransformBlock(id: 4) TransformManyBlock(myClass1: 3|Star Wars) TransformManyBlock(myClass1: 4|Star Wars) ActionBlock(myClass2: 1|3 did this) ActionBlock(myClass2: 2|3 did that) ActionBlock(myClass2: 3|3 did this again) ActionBlock(myClass2: 1|4 did this) ActionBlock(myClass2: 2|4 did that) ActionBlock(myClass2: 3|4 did this again) [the rest of the items] Press any key to exit... 

任何人都可以指出我正确的方向?

你几乎就在那里,你需要在管道的第一个块上调用Complete ,然后在最后一个块上await Completion 。 然后在您的链接中,您需要像这样传播完成:

 private async static void Main(string[] args) { var transformBlock = new TransformBlock(delegate (int id) { Console.WriteLine($"TransformBlock(id: {id})"); return new MyClass1(id, "Star Wars"); }); var transformManyBlock = new TransformManyBlock(delegate (MyClass1 myClass1) { Console.WriteLine($"TransformManyBlock(myClass1: {myClass1.Id}|{myClass1.Value})"); Thread.Sleep(1000); return GetMyClass22Values(myClass1); }); var actionBlock = new ActionBlock(delegate (MyClass2 myClass2) { Console.WriteLine($"ActionBlock(myClass2: {myClass2.Id}|{myClass2.Value})"); Thread.Sleep(1000); }); //propagate completion transformBlock.LinkTo(transformManyBlock, new DataflowLinkOptions() { PropagateCompletion = true }); transformManyBlock.LinkTo(actionBlock, new DataflowLinkOptions() { PropagateCompletion = true}); foreach(var id in ids) { transformBlock.Post(id); } //Complete the first block transformBlock.Complete(); //wait for completion to flow to the last block await actionBlock.Completion; } 

您还可以将批处理块合并到管道中,并且不再需要TryRecieve调用,但这似乎是您的流程的另一部分。

编辑

将完成传播到多个块的示例:

 public async static void Main(string[] args) { var sourceBlock = new BufferBlock(); var processBlock1 = new ActionBlock(i => Console.WriteLine($"Block1 {i}")); var processBlock2 = new ActionBlock(i => Console.WriteLine($"Block2 {i}")); sourceBlock.LinkTo(processBlock1); sourceBlock.LinkTo(processBlock2); var sourceBlockCompletion = sourceBlock.Completion.ContinueWith(tsk => { if(!tsk.IsFaulted) { processBlock1.Complete(); processBlock2.Complete(); } else { ((IDataflowBlock)processBlock1).Fault(tsk.Exception); ((IDataflowBlock)processBlock2).Fault(tsk.Exception); } }); //Send some data... sourceBlock.Complete(); await Task.WhenAll(sourceBlockCompletion, processBlock1.Completion, processBlock2.Completion); }