使用线程处理队列的最有效方法

我有一个队列,其上放置了待处理的傅立叶变换请求(相对耗时的操作) – 在某些情况下,我们每秒可以获得数千个变换请求,因此它必须快速。

我正在升级旧代码以使用.net 4,以及移植到TPL。 我想知道处理这个队列的最有效(最快的吞吐量)方式是什么样的。 我想使用所有可用的核心。

目前我正在尝试使用BlockingCollection。 我创建了一个队列处理程序类,它产生了4个任务,这些任务阻塞了BlockingCollection并等待传入​​的工作。 然后他们处理该挂起的转换。 码:

public class IncomingPacketQueue : IDisposable { BlockingCollection _packetQ = new BlockingCollection(); public IncomingPacketQueue(int workerCount) { for (int i = 0; i < workerCount; i++) { Task.Factory.StartNew(Consume); } } public void EnqueueSweep(IncomingPacket incoming) { _packetQ.Add(incoming); } private void Consume() { foreach (var sweep in _packetQ.GetConsumingEnumerable()) { //do stuff var worker = new IfftWorker(); Trace.WriteLine(" Thread {0} picking up a pending ifft".With(Thread.CurrentThread.ManagedThreadId)); worker.DoIfft(sweep); } } public int QueueCount { get { return _packetQ.Count; } } #region IDisposable Members public void Dispose() { _packetQ.CompleteAdding(); } #endregion } 

这看起来像一个很好的解决方案吗? 它似乎最大化了所有核心 – 虽然我目前不确定我应该在我的构造函数中产生多少工人。

这看起来很合理。 我发现BlockingCollection非常快。 我用它来处理每秒数万个请求。

如果您的应用程序受处理器限制,那么您可能不希望创建比核心更多的工作者。 当然,您不希望创建比核心更多的工作者。 在四核机器上,如果您希望大部分时间花在FFT上,那么四个工作人员将占用所有CPU。 更多的工作者只是意味着你有更多的线程上下文切换来处理。 TPL通常会为您平衡这一点,但是当您无法处理超过少数人时,没有理由创建100名工人。

我建议你用3,4,5,6,7和8名工人进行测试。 看看哪一个给你最好的吞吐量。

我同意吉姆的观点。 你的方法看起来非常好。 你不会变得更好。 我不是FFT专家,但我假设这些操作几乎100%受CPU限制。 如果情况确实如此,那么对工人数量的良好初步猜测将是与机器中核心数量的直接一对一关联。 您可以使用Environment.ProcessorCount来获取此值。 您可以尝试使用2x或4x的乘数,但同样,如果这些操作受CPU限制,则高于1x的任何操作都可能导致更多开销。 使用Environment.ProcessorCount可以使您的代码更具可移植性。

另一个建议……让TPL知道这些是专用线程。 您可以通过指定LongRunning选项来执行此LongRunning

 public IncomingPacketQueue() { for (int i = 0; i < Environment.ProcessorCount; i++) { Task.Factory.StartNew(Consume, TaskCreationOptions.LongRunning); } } 

为什么不使用Parallel.ForEach并让TPL处理创建的线程数。

  Parallel.ForEach(BlockingCollectionExtensions.GetConsumingPartitioneenter(_packetQ), sweep => { //do stuff var worker = new IfftWorker(); Trace.WriteLine(" Thread {0} picking up a pending ifft".With(Thread.CurrentThread.ManagedThreadId)); worker.DoIfft(sweep); }); 

(GetConsumingPartitioner是ParallelExtensionsExtras的一部分)

使工作人员数量可配置。 也有太多的工人,它会变慢(如另一张海报所示),所以你需要找到最佳位置。 可配置的值允许测试运行以找到最佳值,或允许您的程序适应不同类型的硬件。 你肯定可以把这个值放在App.Config中并在启动时阅读它。

您还可以尝试使用PLINQ并行化处理,以查看它与您当前使用的方法的比较。 它有一些技巧可以使它在某些情况下非常有效。

 _packetQ.GetConsumingEnumerable().AsParallel().ForAll( sweep => new IfftWorker().DoIfft(sweep));