数据流与拆分工作到小作业,然后再次分组

我需要做这样的工作:

  1. 从数据库中获取Page对象
  2. 为每个页面获取所有图像并处理它们(IO绑定,例如,上传到CDN)
  3. 如果所有图像都成功完成,则将页面标记为在数据库中处理

由于我需要控制并行处理的页数,我决定使用TPL数据流:

____________________________ | Data pipe | | BufferBlock | | BoundedCapacity = 1 | |____________________________| | ____________________________ | Process images | | TransformBlock | | BoundedCapacity = 1 | | MaxDegreeOfParallelism = 8 | |____________________________| | ____________________________ | Save page | | ActionBlock | | BoundedCapacity = 1 | | MaxDegreeOfParallelism = 5 | |____________________________| 

现在我需要“过程图像”来并行处理图像,但我想限制我目前在工作中所有并行页面处理的图像数量。

我可以将TrasnformManyBlock用于“过程图像”,但如何在“保存页面”块中将它们收回?

  ____________________________ | Data pipe | | BufferBlock | | BoundedCapacity = 1 | |____________________________| | ___________________________________ | Load images | | TransformManyBlock | | BoundedCapacity = 1 | | MaxDegreeOfParallelism = 8 | |___________________________________| / | \ ______________________________________________ _|____________________________________________ | | Process image | | | TransformBlock | | | BoundedCapacity = 1 | | | MaxDegreeOfParallelism = 8 |_| |______________________________________________| \ | / How to group images by page ? | ____________________________ | Save page | | ActionBlock | | BoundedCapacity = 1 | | MaxDegreeOfParallelism = 5 | |____________________________| 

最重要的是,其中一个图像可能无法继续,我不想保存包含失败图像的页面。

您可以在给定页面的图像到达时通过记录将图像分组在一起,然后在所有图像到达时发送页面。 为了解决这个问题,页面需要知道它包含多少图像,但我想你知道这一点。

在代码中,它看起来像这样:

 public static IPropagatorBlock CreaterMergerBlock( Func getMergedFunc, Func getSplitCount) { var dictionary = new Dictionary(); return new TransformManyBlock( split => { var merged = getMergedFunc(split); int count; dictionary.TryGetValue(merged, out count); count++; if (getSplitCount(merged) == count) { dictionary.Remove(merged); return new[] { merged }; } dictionary[merged] = count; return new TMerged[0]; }); } 

用法:

 var dataPipe = new BufferBlock(); var splitter = new TransformManyBlock( page => page.LoadImages(), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 }); var processImage = new TransformBlock( image => { // process the image here return image; }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 }); var merger = CreaterMergerBlock( (ImageWithPage image) => image.Page, page => page.ImageCount); var savePage = new ActionBlock( page => /* save the page here */, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 }); dataPipe.LinkTo(splitter); splitter.LinkTo(processImage); processImage.LinkTo(merger); merger.LinkTo(savePage); 

考虑将“加载图像”和“处理图像”合并到一个TransformBlock块中。 这样,您可以毫不费力地将单个页面的图像保持在一起。

为了实现并发限制目标,请使用SemaphoreSlim

 SemaphoreSlim processImageDopLimiter = new SemaphoreSlim(8); //... var page = ...; //TransformBlock block input var images = GetImages(page); ImageWithPage[] processedImages = images .AsParallel() .Select(i => { processImageDopLimiter.WaitOne(); var result = ProcessImage(i); processImageDopLimiter.ReleaseOne(); return result; }) .ToList(); return new { page, processedImages }; 

这将导致相当多的线程被阻塞等待。 如果您愿意,可以使用此处理的异步版本。 这对这个问题来说并不重要。