有没有更好的方法来等待排队的线程?

有没有更好的方法在执行另一个进程之前等待排队的线程?

目前我在做:

this.workerLocker = new object(); // Global variable this.RunningWorkers = arrayStrings.Length; // Global variable // Initiate process foreach (string someString in arrayStrings) { ThreadPool.QueueUserWorkItem(this.DoSomething, someString); Thread.Sleep(100); } // Waiting execution for all queued threads lock (this.workerLocker) // Global variable (object) { while (this.RunningWorkers > 0) { Monitor.Wait(this.workerLocker); } } // Do anything else Console.WriteLine("END"); 

 // Method DoSomething() definition public void DoSomething(object data) { // Do a slow process... . . . lock (this.workerLocker) { this.RunningWorkers--; Monitor.Pulse(this.workerLocker); } } 

您可能想看一下AutoResetEvent和ManualResetEvent。

这些都是针对这种情况的(在做“某事”之前等待ThreadPool线程完成)。

你会做这样的事情:

 static void Main(string[] args) { List resetEvents = new List(); foreach (var x in Enumerable.Range(1, WORKER_COUNT)) { ManualResetEvent resetEvent = new ManualResetEvent(); ThreadPool.QueueUserWorkItem(DoSomething, resetEvent); resetEvents.Add(resetEvent); } // wait for all ManualResetEvents WaitHandle.WaitAll(resetEvents.ToArray()); // You probably want to use an array instead of a List, a list was just easier for the example :-) } public static void DoSomething(object data) { ManualResetEvent resetEvent = data as ManualResetEvent; // Do something resetEvent.Set(); } 

编辑:忘记提及你可以等待单个线程,任何线程等等。 另外,根据您的情况,AutoResetEvent可以简化一些事情,因为它(顾名思义)可以自动发出事件信号:-)

如何使用仅使用MonitorForkJoin ;-p

 Forker p = new Forker(); foreach (var obj in collection) { var tmp = obj; p.Fork(delegate { DoSomeWork(tmp); }); } p.Join(); 

这个早期答案中显示的完整代码。

或者对于上限大小的生产者/消费者队列(线程安全等), 这里 。

除了Barrier之外,Henk Holterman指出(BTW他对Barrier的使用非常糟糕,请参阅我的回答),.NET 4.0提供了大量其他选项(在.NET 3.5中使用它们需要下载来自Microsoft的额外DLL )。 我在博客上写了一篇文章,列出了所有内容 ,但我最喜欢的是Parallel.ForEach:

 Parallel.ForEach(arrayStrings, someString => { DoSomething(someString); }); 

在幕后,Parallel.ForEach排队到新的和改进的线程池,并等待所有线程完成。

当我不得不等待任务完成时,我真的很喜欢Begin-End-Async Pattern 。

我建议你将BeginEnd包装在一个worker类中:

 public class StringWorker { private string m_someString; private IAsyncResult m_result; private Action DoSomethingDelegate; public StringWorker(string someString) { DoSomethingDelegate = DoSomething; } private void DoSomething() { throw new NotImplementedException(); } public IAsyncResult BeginDoSomething() { if (m_result != null) { throw new InvalidOperationException(); } m_result = DoSomethingDelegate.BeginInvoke(null, null); return m_result; } public void EndDoSomething() { DoSomethingDelegate.EndInvoke(m_result); } } 

要开始和工作,请使用以下代码段:

 List workers = new List(); foreach (var someString in arrayStrings) { StringWorker worker = new StringWorker(someString); worker.BeginDoSomething(); workers.Add(worker); } foreach (var worker in workers) { worker.EndDoSomething(); } Console.WriteLine("END"); 

就是这样。

旁注:如果要从BeginEnd返回结果,则将“Action”更改为Func并更改EndDoSomething以返回类型。

 public class StringWorker { private string m_someString; private IAsyncResult m_result; private Func DoSomethingDelegate; public StringWorker(string someString) { DoSomethingDelegate = DoSomething; } private string DoSomething() { throw new NotImplementedException(); } public IAsyncResult BeginDoSomething() { if (m_result != null) { throw new InvalidOperationException(); } m_result = DoSomethingDelegate.BeginInvoke(null, null); return m_result; } public string EndDoSomething() { return DoSomethingDelegate.EndInvoke(m_result); } } 

就在这里。

建议的方法

1)计数器和等待句柄

 int ActiveCount = 1; // 1 (!) is important EventWaitHandle ewhAllDone = new EventWaitHandle(false, ResetMode.Manual); 

2)添加循环

 foreach (string someString in arrayStrings) { Interlocked.Increment(ref ActiveCount); ThreadPool.QueueUserWorkItem(this.DoSomething, someString); // Thread.Sleep(100); // you really need this sleep ? } PostActionCheck(); ewhAllDone.Wait(); 

3) DoSomething应该是这样的

 { try { // some long executing code } finally { // .... PostActionCheck(); } } 

4) PostActionCheck在哪里

 void PostActionCheck() { if (Interlocked.Decrement(ref ActiveCount) == 0) ewhAllDone.Set(); } 

理念

ActiveCount初始化为1 ,然后增加n次。

PostActionCheck被称为n + 1次。 最后一个将触发事件。

这个解决方案的好处是使用单个内核对象(这是一个事件),以及轻量级API的2 * n + 1调用。 (可以少吗?)

PS

我在这里写了代码,我可能拼错了一些类名。

.NET 4.0有一个新类, Barrier 。

除此之外,您的方法并不是那么糟糕,如果在减量后RunningWorkers为0,您可以仅通过Pulsing进行优化。 那看起来像是:

 this.workerLocker = new object(); // Global variable this.RunningWorkers = arrayStrings.Length; // Global variable foreach (string someString in arrayStrings) { ThreadPool.QueueUserWorkItem(this.DoSomething, someString); //Thread.Sleep(100); } // Waiting execution for all queued threads Monitor.Wait(this.workerLocker); 

 // Method DoSomething() definition public void DoSomething(object data) { // Do a slow process... // ... lock (this.workerLocker) { this.RunningWorkers--; if (this.RunningWorkers == 0) Monitor.Pulse(this.workerLocker); } } 

您可以使用EventWaithandle或AutoResetEvent,但它们都包含在Win32 API中。 由于Monitor类是纯托管代码,因此我更喜欢Monitor。

我不确定是否真的,我最近做了类似的事情来扫描子网的每个IP以接受特定的端口。

我可以建议的一些事情可能会提高性能:

  • 使用ThreadPool的SetMaxThreads方法来调整性能(即平衡有大量线程一次运行,而不是在如此大量的线程上锁定时间)。

  • 在设置所有工作项时不要睡觉,没有真正的需要(我立即意识到。但是,请在DoSomething方法中睡觉,也许只有一毫秒,以允许其他线程在那里跳入,如果需要的话。

我相信你自己可以实现更自定义的方法,但我怀疑它比使用ThreadPool更有效。

PS我不是100%清楚使用显示器的原因,因为你还在锁定? 请注意,问题只是因为我之前没有使用过Monitor类,而不是因为我实际上怀疑它是否正在使用。

使用Spring Threading。 它内置了Barrier实现。

http://www.springsource.org/extensions/se-threading-net