管道,多路复用和无界缓冲

(注意:我使用的是.Net 4, 而不是 .Net 4.5,所以我不能使用TPL的DataflowBlock类。)

TL; DR版本

最后,我只是想找到一种方法来处理使用多个线程的顺序工作项,以便在最终输出中保留它们的顺序,而不需要无限制的输出缓冲区。

动机

我现有的代码提供了一个multithreading机制来处理多个数据块,其中一个I / O绑定线程(“供应商”)负责排队数据块以进行处理。 这些数据块包括工作项。

一个或多个线程(“处理器”)负责一次使一个工作项出列,然后处理这些工作项,然后在将其下一个工作项出列之前将处理后的数据写入输出队列。

最终的I / O绑定线程(“使用者”)负责从输出队列中出列已完成的工作项并将它们写入最终目标。 这些工作项目(并且必须)按照它们入队的顺序编写。 我使用并发优先级队列实现了这一点,其中每个项目的优先级由其源索引定义。

我正在使用这种方案在大数据流上进行一些自定义压缩,其中压缩本身相对较慢但是未压缩数据的读取和压缩数据的写入相对较快(尽管受I / O限制)。

我以64K的相当大的块处理数据,因此管道的开销相对较小。

我目前的解决方案运行良好,但它涉及6年前使用许多同步事件编写的大量自定义代码,并且设计看起来有些笨拙; 因此,我已经开始学术练习,看看它是否可以使用更现代的.Net库进行重写。

新设计

我的新设计使用了BlockingCollection类,并且基于这篇Microsoft文章 。

特别是,请查看标题为使用多个生产者进行负载平衡的部分。 我尝试过使用这种方法,因此我有几个处理任务,每个任务都从共享输入BlockingCollection获取工作项,并将完成的项写入自己的BlockingCollection输出队列。

因为每个处理任务都有自己的输出队列,所以我试图使用BlockingCollection.TakeFromAny()来取消第一个可用的已完成工作项。

多路复用器问题

到目前为止一切顺利,但现在问题来了。 微软文章指出:

差距是个问题。 管道的下一个阶段,即显示图像阶段,需要按顺序显示图像,并且序列中没有间隙。 这是多路复用器的用武之地。使用TakeFromAny方法,多路复用器等待来自两个filter阶段生产者队列的输入。 当图像到达时,多路复用器查看图像的序列号是否是预期序列中的下一个。 如果是,则多路复用器将其传递到显示图像阶段。 如果图像不是序列中的下一个图像,则多路复用器将值保存在内部前瞻缓冲区中,并对没有前瞻值的输入队列重复获取操作。 此算法允许多路复用器以确保顺序排序而不对值进行排序的方式将来自传入生成器队列的输入组合在一起。

好的,所发生的是处理任务可以几乎任何顺序生成完成的项目。 多路复用器负责以正确的顺序输出这些项目。

然而…

想象一下,我们有1000件待处理的物品。 进一步想象一下,由于一些奇怪的原因,第一个项目需要更长的时间来处理所有其他项目的组合。

使用我当前的方案,多路复用器将继续读取和缓冲来自所有处理输出队列的项目,直到它找到它应该输出的下一个项目。 由于它等待的项目(根据我的“想象如果”)仅在处理完所有其他工作项后才出现,我将有效地缓冲整个输入中的所有工作项!

数据量太大,无法实现。 当输出队列达到某个最大大小时(即它是一个有界的输出队列),我需要能够停止处理任务输出已完成的工作项,除非工作项恰好是多路复用器正在等待的工作项。

这就是我有点陷入困境的地方。 我可以想出很多方法来实际实现它,但它们似乎都过于复杂,以至于它们并不比我想要替换的代码更好!

我的问题是什么?

我的问题是:我是否以正确的方式解决这个问题?

我本以为这是一个众所周知的问题,但是我的研究只发现了一些文章,这些文章似乎忽略了如果工作项与其他所有工作项相比需要很长时间而发生的无限缓冲问题。

任何人都可以指出任何描述实现这一目标的合理方法的文章吗?

TL; DR版本

最后,我只是想找到一种方法来处理使用多个线程的顺序工作项,以便在最终输出中保留它们的顺序,而不需要无限制的输出缓冲区。

比如,在启动时创建一个项目池,1000。 将它们存储在BlockingCollection上 – 一个’池队列’。

供应商从池队列中获取项目,从文件中加载项目,加载序列号/任何内容并将它们提交给处理器线程池。

处理器完成它们的工作并将输出发送到多路复用器。 多路复用器完成存储任何无序项目的工作,直到处理了早期项目。

当一个项目被多路复用器输出的任何内容完全占用时,它们将返回到池队列以供供应商重用。

如果一个“慢项目”确实需要大量处理,多路复用器中的无序收集将随着“快速项目”在其他池线程上滑落而增长,但因为多路复用器实际上并未将其项目提供给它的输出,池队列没有被补充。

当池清空时,供应商将阻止它并且将无法再提供任何物品。

处理池输入中剩余的“快速项目”将被处理,然后处理将停止,除了“慢项目”。 供应商被阻止,多路复用器在其集合中具有[poolSize-1]项。 没有使用额外的内存,没有浪费CPU,唯一发生的事情是处理’慢项’。

当“慢速项目”最终完成时,它将输出到多路复用器。

多路复用器现在可以按所需的顺序输出所有[poolSize]项。 当这些项目被消耗时,池会再次被填满,供应商现在能够从池中获取项目,然后再次读取其文件,将排队项目读取到处理器池。

自动调节,无需有限缓冲,无内存失控。

编辑:我的意思是“不需要有限的缓冲区”:)

此外,没有GC持有 – 由于项目被重复使用,他们不需要GC。

我想你误解了这篇文章。 根据描述,它没有无界缓冲区,每个队列的look-ahread缓冲区中最多只有一个值。 当您使一个不是下一个值的值出列时,将其保存,然后仅等待缓冲区中没有值的队列。 (如果您有多个输入缓冲区,则逻辑必须更复杂,或者您需要一个包含2个队列多路复用器的树。)

如果将此与已指定有界容量的BlockingCollection结合使用,则可以获得所需的行为:如果一个生产者太慢,则其他生成器将暂停,直到慢速线程赶上。

您是否考虑过不使用手动生产者/消费者缓冲而是使用.AsParallel().AsOrdered() PLINQ替代? 从语义上讲,这正是您想要的 – 一系列并行处理但在输出中排序的项目。 你的代码看起来很简单……

 var orderedOutput = ReadSequentialBlocks() .AsParallel() .AsOrdered() .Select(ProcessBlock) foreach(var item in orderedOutput) Sink(item); 

默认的并行度是计算机上的处理器数量,但您可以对其进行调整。 有一个自动输出缓冲区。 如果默认缓冲消耗的资源太多,您可以将其关闭:

 .WithMergeOptions(ParallelMergeOptions.NotBuffered) 

但是,我肯定会先给出简单朴素的版本 – 你永远不知道,它可能只是开箱即用。 最后,如果您想要自动多路复用的简单性但是大于零的非自动缓冲区,您可以始终使用PLINQ查询来填充固定大小的BlockingCollection<> ,在另一个线程上使用可枚举的消息来读取。

跟进

为了完整起见,这是我最后编写的代码。 感谢Martin James的回答,为解决方案提供了基础。

我对多路复用器仍然不满意(参见ParallelWorkProcessor.multiplex() )。 它有效,但看起来有点笨拙。

我使用Martin James关于工作池的想法来防止多路复用器缓冲区的无限增长,但是我用SemaphoreSlim替换了工作池队列(因为它提供了相同的function,但它使用起来更简单并且使用更少的资源)。

工作任务将完成的项目写入并发优先级队列。 这使我能够轻松有效地找到要输出的下一个项目。

我使用了Microsoft的一个示例并发优先级队列 ,进行了修改以提供一个自动复位事件,该事件在新项目入队时发出信号。

这是ParallelWorkProcessor类。 你通过提供三个代表来使用它; 一个用于提供工作项,一个用于处理工作项,另一个用于输出已完成的工作项。

 using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics.Contracts; using System.Threading; using System.Threading.Tasks; namespace Demo { public sealed class ParallelWorkProcessor where T: class // T is the work item type. { public delegate T Read(); // Called by only one thread. public delegate T Process(T block); // Called simultaneously by multiple threads. public delegate void Write(T block); // Called by only one thread. public ParallelWorkProcessor(Read read, Process process, Write write, int numWorkers = 0) { _read = read; _process = process; _write = write; numWorkers = (numWorkers > 0) ? numWorkers : Environment.ProcessorCount; _workPool = new SemaphoreSlim(numWorkers*2); _inputQueue = new BlockingCollection(numWorkers); _outputQueue = new ConcurrentPriorityQueue(); _workers = new Task[numWorkers]; startWorkers(); Task.Factory.StartNew(enqueueWorkItems); _multiplexor = Task.Factory.StartNew(multiplex); } private void startWorkers() { for (int i = 0; i < _workers.Length; ++i) { _workers[i] = Task.Factory.StartNew(processBlocks); } } private void enqueueWorkItems() { int index = 0; while (true) { T data = _read(); if (data == null) // Signals end of input. { _inputQueue.CompleteAdding(); _outputQueue.Enqueue(index, null); // Special sentinel WorkItem . break; } _workPool.Wait(); _inputQueue.Add(new WorkItem(data, index++)); } } private void multiplex() { int index = 0; // Next required index. int last = int.MaxValue; while (index != last) { KeyValuePair workItem; _outputQueue.WaitForNewItem(); // There will always be at least one item - the sentinel item. while ((index != last) && _outputQueue.TryPeek(out workItem)) { if (workItem.Value == null) // The sentinel item has a null value to indicate that it's the sentinel. { last = workItem.Key; // The sentinel's key is the index of the last block + 1. } else if (workItem.Key == index) // Is this block the next one that we want? { // Even if new items are added to the queue while we're here, the new items will be lower priority. // Therefore it is safe to assume that the item we will dequeue now is the same one we peeked at. _outputQueue.TryDequeue(out workItem); Contract.Assume(workItem.Key == index); // This *must* be the case. _workPool.Release(); // Allow the enqueuer to queue another work item. _write(workItem.Value); ++index; } else // If it's not the block we want, we know we'll get a new item at some point. { _outputQueue.WaitForNewItem(); } } } } private void processBlocks() { foreach (var block in _inputQueue.GetConsumingEnumerable()) { var processedData = _process(block.Data); _outputQueue.Enqueue(block.Index, processedData); } } public bool WaitForFinished(int maxMillisecondsToWait) // Can be Timeout.Infinite. { return _multiplexor.Wait(maxMillisecondsToWait); } private sealed class WorkItem { public WorkItem(T data, int index) { Data = data; Index = index; } public T Data { get; private set; } public int Index { get; private set; } } private readonly Task[] _workers; private readonly Task _multiplexor; private readonly SemaphoreSlim _workPool; private readonly BlockingCollection _inputQueue; private readonly ConcurrentPriorityQueue _outputQueue; private readonly Read _read; private readonly Process _process; private readonly Write _write; } } 

这是我的测试代码:

 using System; using System.Diagnostics; using System.Threading; namespace Demo { public static class Program { private static void Main(string[] args) { _rng = new Random(34324); int threadCount = 8; _maxBlocks = 200; ThreadPool.SetMinThreads(threadCount + 2, 4); // Kludge to prevent slow thread startup. var stopwatch = new Stopwatch(); _numBlocks = _maxBlocks; stopwatch.Restart(); var processor = new ParallelWorkProcessor(read, process, write, threadCount); processor.WaitForFinished(Timeout.Infinite); Console.WriteLine("\n\nFinished in " + stopwatch.Elapsed + "\n\n"); } private static byte[] read() { if (_numBlocks-- == 0) { return null; } var result = new byte[128]; result[0] = (byte)(_maxBlocks-_numBlocks); Console.WriteLine("Supplied input: " + result[0]); return result; } private static byte[] process(byte[] data) { if (data[0] == 10) // Hack for test purposes. Make it REALLY slow for this item! { Console.WriteLine("Delaying a call to process() for 5s for ID 10"); Thread.Sleep(5000); } Thread.Sleep(10 + _rng.Next(50)); Console.WriteLine("Processed: " + data[0]); return data; } private static void write(byte[] data) { Console.WriteLine("Received output: " + data[0]); } private static Random _rng; private static int _numBlocks; private static int _maxBlocks; } }