串行任务执行器; 这个线程安全吗?

我使用ThreadPool作为执行手段,创建了一个允许异步顺序执行任务的类。 我的想法是,我将在后台运行串行任务的多个实例,但我不希望每个实例都有一个单独的专用线程。 我想检查的是这个类是否真的是线程安全的。 这是相当简短的,所以我想我会由专家在这里运行它,以防我遗漏了一些明显的东西。 我省略了一些针对不同Action类型的方便重载。

///  /// This class wraps ThreadPool.QueueUserWorkItem, but providing guaranteed ordering of queued tasks for this instance. /// Only one task in the queue will execute at a time, with the order of execution matching the order of addition. /// This is designed as a lighter-weight alternative to using a dedicated Thread for processing of sequential tasks. ///  public sealed class SerialAsyncTasker { private readonly Queue mTasks = new Queue(); private bool mTaskExecuting; ///  /// Queue a new task for asynchronous execution on the thread pool. ///  /// Task to execute public void QueueTask(Action task) { if (task == null) throw new ArgumentNullException("task"); lock (mTasks) { bool isFirstTask = (mTasks.Count == 0); mTasks.Enqueue(task); //Only start executing the task if this is the first task //Additional tasks will be executed normally as part of sequencing if (isFirstTask && !mTaskExecuting) RunNextTask(); } } ///  /// Clear all queued tasks. Any task currently executing will continue to execute. ///  public void Clear() { lock (mTasks) { mTasks.Clear(); } } ///  /// Wait until all currently queued tasks have completed executing. /// If no tasks are queued, this method will return immediately. /// This method does not prevent the race condition of a second thread /// queueing a task while one thread is entering the wait; /// if this is required, it must be synchronized externally. ///  public void WaitUntilAllComplete() { lock (mTasks) { while (mTasks.Count > 0 || mTaskExecuting) Monitor.Wait(mTasks); } } private void RunTask(Object state) { var task = (Action)state; task(); mTaskExecuting = false; RunNextTask(); } private void RunNextTask() { lock (mTasks) { if (mTasks.Count > 0) { mTaskExecuting = true; var task = mTasks.Dequeue(); ThreadPool.QueueUserWorkItem(RunTask, task); } else { //If anybody is waiting for tasks to be complete, let them know Monitor.PulseAll(mTasks); } } } } 

更新:我已修改代码以修复Simon所指出的主要错误。 现在通过unit testing,但我仍然欢迎观察。

这是我的第二个答案,假设您无法使用.NET 4.0(并希望对现有代码进行评论)。

QueueTask将第一个任务排入队列,获取isFirstTask = true,并启动一个新线程。 但是,另一个线程可能会在第一个线程正在处理时排队,而Count == 0 => isFirstTask = true,然后生成另一个线程。

此外,如果任务执行抛出exception(可能不一定会使所有内容崩溃,具体取决于exception处理),WaitUntilAllComplete将无限期挂起,从而导致它跳过对RunNextTask()的调用。

而WaitUntilAllComplete只是等待,直到没有更多的排队任务,而不是当前正在执行的任务正在执行(它们可能只是在ThreadPool中排队)或完成。

不要这样做。 (或者至少避免建立自己的东西。)

使用System.Threading.Tasks的东西(.NET 4.0中的新增function)。 创建一个Task [](大小取决于你想要的并行任务数),让他们在等待CancellationToken时从BlockingCollection中读取工作项。 您的WaitForAll实现将触发您的令牌,并调用Task.WaitAll(Task []) ,它将阻塞,直到您的所有任务完成。

它内置于4.0

如何:创建限制并发度的任务计划程序

您还可以使用自定义调度程序来实现默认调度程序不提供的function,例如严格的先进先出(FIFO)执行顺序。 以下示例演示如何创建自定义任务计划程序。 此调度程序允许您指定并发度。

我看到你的SerialAsyncTasker类有一些问题,但听起来你可能已经很好地掌握了这些问题所以我不会详细讨论这个主题(我可能会在稍后编辑我的答案以及更多细节)。 您在评论中指出您不能使用.NET 4.0function,也不能使用Reactive Extensions backport。 我建议你在专用线程上使用生产者 – 消费者模式和单个消费者。 这完全符合您按顺序异步执行任务的要求。

注意:您必须强化代码以支持正常关闭,处理exception等。

 public class SerialAsyncTasker { private BlockingCollection m_Queue = new BlockingCollection(); public SerialAsyncTasker() { var thread = new Thread( () => { while (true) { Action task = m_Queue.Take(); task(); } }); thread.IsBackground = true; thread.Start(); } public void QueueTask(Action task) { m_Queue.Add(task); } } 

太糟糕了,你无法使用.NET 4.0 BCL或Reactive Extension下载中的BlockingCollection,但不用担心。 实际上自己实现它并不太难。 你可以使用Stephen Toub的阻塞队列作为起点,只需重命名一些东西。

 public class BlockingCollection { private Queue m_Queue = new Queue(); public T Take() { lock (m_Queue) { while (m_Queue.Count <= 0) Monitor.Wait(m_Queue); return m_Queue.Dequeue(); } } public void Add(T value) { lock (m_Queue) { m_Queue.Enqueue(value); Monitor.Pulse(m_Queue); } } } 
 public class ParallelExcecuter { private readonly BlockingCollection _workItemHolder; public ParallelExcecuter(int maxDegreeOfParallelism) { _workItemHolder = new BlockingCollection(maxDegreeOfParallelism); } public void Submit(Action action) { _workItemHolder.Add(Task.Run(action).ContinueWith(t => { _workItemHolder.Take(); })); } public void WaitUntilWorkDone() { while (_workItemHolder.Count < 0) { Monitor.Wait(_workItemHolder); } } }