使用工作线程对项目进行排队

我一直试图找出如何解决我的要求,但对于我的生活,我只是无法想出一个解决方案。

我有一个项目数据库,它们存储一种队列。 (数据库已经实现,其他进程将向此队列添加项目。)

这些项目需要大量的工作/时间来“处理”,所以我需要能够:不断地从数据库中排队项目。 对于每个项目运行一个新线程并处理该项目,然后返回true / false它已成功处理。 (这将用于将其重新添加到数据库队列中)

但是,只有当前活动线程数(每个项目正在处理一个)小于最大线程数参数时才执行此操作。

一旦达到最大线程数,我需要停止从数据库中对项目进行排队,直到当前线程数小于最大线程数。 此时它需要继续对项目进行排队。

感觉这应该是我能想到的东西,但它不是来找我。

澄清一下:我只需要实现线程。 该数据库已经实施。

一个非常简单的方法是使用Semaphore 。 您有一个线程使项目出列并创建线程来处理它们。 例如:

 const int MaxThreads = 4; Semaphore sem = new Semaphore(MaxThreads, MaxThreads); while (Queue.HasItems()) { sem.WaitOne(); var item = Queue.Dequeue(); Threadpool.QueueUserWorkItem(ProcessItem, item); // see below } // When the queue is empty, you have to wait for all processing // threads to complete. // If you can acquire the semaphore MaxThreads times, all workers are done int count = 0; while (count < MaxThreads) { sem.WaitOne(); ++count; } // the code to process an item void ProcessItem(object item) { // cast the item to whatever type you need, // and process it. // when done processing, release the semaphore sem.Release(); } 

上述技术效果很好。 编码简单,易于理解且非常有效。

一个变化是您可能希望使用Task API而不是Threadpool.QueueUserWorkItemTask可让您更好地控制异步处理,包括取消。 我在我的例子中使用了QueueUserWorkItem ,因为我对它更熟悉。 我会在生产程序中使用Task

虽然这确实使用了N + 1个线程(其中N是您希望并发处理的项目数),但该额外线程通常不会执行任何操作。 它运行的唯一时间是它将工作分配给工作线程。 否则,它正在对信号量进行非忙等待。

你不知道从哪里开始?

考虑具有最大线程数的线程池。 http://msdn.microsoft.com/en-us/library/y5htx827.aspx

考虑立即调整最大线程数并监控数据库。 http://msdn.microsoft.com/en-us/library/system.threading.threadpool.queueuserworkitem.aspx很方便。

请记住,您不能保证您的流程将安全结束…崩溃发生。 考虑记录处理状态。

请记住,您的select和remove-from-queue操作应该是primefaces操作。

好的,所以解决方案的体系结构将取决于一件事:每个队列项的处理时间是否根据项目的数据而变化?

如果没有那么你可以在处理线程之间只有循环。 实现起来相当简单。

如果处理时间确实不同,那么你将需要一些具有更多“下一个可用”感觉的东西,这样你的任何一个线程碰巧都是免费的,首先得到处理数据项的工作。

在完成这项工作之后,您将通常会遇到如何在队列读取器和处理线程之间进行同步。 “next-available”和“round-robin”之间的区别在于您如何进行同步。

我对C#并不过分熟悉,但我听说过一个叫做背景工作者的野兽。 这可能是一种可以接受的方法。

对于循环法,只需为每个队列项启动后台工作程序,将工作程序的引用存储在数组中。 比如说,16名正在进行的背景工作者。 这个想法是,你已经开始16,然后在开始第17次之前等待第一次完成,依此类推。 我相信后台工作者实际上在线程池上作为作业运行,因此这将自动将任何时候实际运行的线程数限制为适合底层硬件的线程。 等待后台工作人员看到这个 。 等待后台工作人员完成后,然后处理其结果并启动另一个。

对于下一个可用的方法,它没有那么不同。 而不是等待第一个完成,你将使用WaitAny()等待任何工人完成。 你从任何一个完成后处理返回,然后再启动另一个并返回WaitAny()。

两种方法的一般原理是始终保持一些线程沸腾。 下一个可用方法的function是,您发出结果的顺序不一定与输入项的顺序相同。 如果这很重要,那么使用比CPU内核更多的后台工作人员的循环方法将是合理有效的(线程池将开始委托但尚未运行的工作人员)。 但是,延迟将随处理时间而变化。

BTW 16是根据您认为运行该软件的PC上将有多少核心选择的任意数字。 核心数量越多,数量越多。

当然,在看似不安和不断变化的.NET世界中,现在可能有更好的方法来实现这一目标。

祝好运!