有没有更好的方法来等待排队的线程?
有没有更好的方法在执行另一个进程之前等待排队的线程?
目前我在做:
this.workerLocker = new object(); // Global variable this.RunningWorkers = arrayStrings.Length; // Global variable // Initiate process foreach (string someString in arrayStrings) { ThreadPool.QueueUserWorkItem(this.DoSomething, someString); Thread.Sleep(100); } // Waiting execution for all queued threads lock (this.workerLocker) // Global variable (object) { while (this.RunningWorkers > 0) { Monitor.Wait(this.workerLocker); } } // Do anything else Console.WriteLine("END");
// Method DoSomething() definition public void DoSomething(object data) { // Do a slow process... . . . lock (this.workerLocker) { this.RunningWorkers--; Monitor.Pulse(this.workerLocker); } }
您可能想看一下AutoResetEvent和ManualResetEvent。
这些都是针对这种情况的(在做“某事”之前等待ThreadPool线程完成)。
你会做这样的事情:
static void Main(string[] args) { List resetEvents = new List (); foreach (var x in Enumerable.Range(1, WORKER_COUNT)) { ManualResetEvent resetEvent = new ManualResetEvent(); ThreadPool.QueueUserWorkItem(DoSomething, resetEvent); resetEvents.Add(resetEvent); } // wait for all ManualResetEvents WaitHandle.WaitAll(resetEvents.ToArray()); // You probably want to use an array instead of a List, a list was just easier for the example :-) } public static void DoSomething(object data) { ManualResetEvent resetEvent = data as ManualResetEvent; // Do something resetEvent.Set(); }
编辑:忘记提及你可以等待单个线程,任何线程等等。 另外,根据您的情况,AutoResetEvent可以简化一些事情,因为它(顾名思义)可以自动发出事件信号:-)
如何使用仅使用Monitor
的Fork
和Join
;-p
Forker p = new Forker(); foreach (var obj in collection) { var tmp = obj; p.Fork(delegate { DoSomeWork(tmp); }); } p.Join();
这个早期答案中显示的完整代码。
或者对于上限大小的生产者/消费者队列(线程安全等), 这里 。
除了Barrier之外,Henk Holterman指出(BTW他对Barrier的使用非常糟糕,请参阅我的回答),.NET 4.0提供了大量其他选项(在.NET 3.5中使用它们需要下载来自Microsoft的额外DLL )。 我在博客上写了一篇文章,列出了所有内容 ,但我最喜欢的是Parallel.ForEach:
Parallel.ForEach(arrayStrings, someString => { DoSomething(someString); });
在幕后,Parallel.ForEach排队到新的和改进的线程池,并等待所有线程完成。
当我不得不等待任务完成时,我真的很喜欢Begin-End-Async Pattern 。
我建议你将BeginEnd包装在一个worker类中:
public class StringWorker { private string m_someString; private IAsyncResult m_result; private Action DoSomethingDelegate; public StringWorker(string someString) { DoSomethingDelegate = DoSomething; } private void DoSomething() { throw new NotImplementedException(); } public IAsyncResult BeginDoSomething() { if (m_result != null) { throw new InvalidOperationException(); } m_result = DoSomethingDelegate.BeginInvoke(null, null); return m_result; } public void EndDoSomething() { DoSomethingDelegate.EndInvoke(m_result); } }
要开始和工作,请使用以下代码段:
List workers = new List (); foreach (var someString in arrayStrings) { StringWorker worker = new StringWorker(someString); worker.BeginDoSomething(); workers.Add(worker); } foreach (var worker in workers) { worker.EndDoSomething(); } Console.WriteLine("END");
就是这样。
旁注:如果要从BeginEnd返回结果,则将“Action”更改为Func并更改EndDoSomething以返回类型。
public class StringWorker { private string m_someString; private IAsyncResult m_result; private Func DoSomethingDelegate; public StringWorker(string someString) { DoSomethingDelegate = DoSomething; } private string DoSomething() { throw new NotImplementedException(); } public IAsyncResult BeginDoSomething() { if (m_result != null) { throw new InvalidOperationException(); } m_result = DoSomethingDelegate.BeginInvoke(null, null); return m_result; } public string EndDoSomething() { return DoSomethingDelegate.EndInvoke(m_result); } }
就在这里。
建议的方法
1)计数器和等待句柄
int ActiveCount = 1; // 1 (!) is important EventWaitHandle ewhAllDone = new EventWaitHandle(false, ResetMode.Manual);
2)添加循环
foreach (string someString in arrayStrings) { Interlocked.Increment(ref ActiveCount); ThreadPool.QueueUserWorkItem(this.DoSomething, someString); // Thread.Sleep(100); // you really need this sleep ? } PostActionCheck(); ewhAllDone.Wait();
3) DoSomething
应该是这样的
{ try { // some long executing code } finally { // .... PostActionCheck(); } }
4) PostActionCheck
在哪里
void PostActionCheck() { if (Interlocked.Decrement(ref ActiveCount) == 0) ewhAllDone.Set(); }
理念
ActiveCount初始化为1
,然后增加n
次。
PostActionCheck
被称为n + 1
次。 最后一个将触发事件。
这个解决方案的好处是使用单个内核对象(这是一个事件),以及轻量级API的2 * n + 1
调用。 (可以少吗?)
PS
我在这里写了代码,我可能拼错了一些类名。
.NET 4.0有一个新类, Barrier 。
除此之外,您的方法并不是那么糟糕,如果在减量后RunningWorkers为0,您可以仅通过Pulsing进行优化。 那看起来像是:
this.workerLocker = new object(); // Global variable this.RunningWorkers = arrayStrings.Length; // Global variable foreach (string someString in arrayStrings) { ThreadPool.QueueUserWorkItem(this.DoSomething, someString); //Thread.Sleep(100); } // Waiting execution for all queued threads Monitor.Wait(this.workerLocker);
// Method DoSomething() definition public void DoSomething(object data) { // Do a slow process... // ... lock (this.workerLocker) { this.RunningWorkers--; if (this.RunningWorkers == 0) Monitor.Pulse(this.workerLocker); } }
您可以使用EventWaithandle或AutoResetEvent,但它们都包含在Win32 API中。 由于Monitor类是纯托管代码,因此我更喜欢Monitor。
我不确定是否真的,我最近做了类似的事情来扫描子网的每个IP以接受特定的端口。
我可以建议的一些事情可能会提高性能:
-
使用ThreadPool的SetMaxThreads方法来调整性能(即平衡有大量线程一次运行,而不是在如此大量的线程上锁定时间)。
-
在设置所有工作项时不要睡觉,没有真正的需要(我立即意识到。但是,请在DoSomething方法中睡觉,也许只有一毫秒,以允许其他线程在那里跳入,如果需要的话。
我相信你自己可以实现更自定义的方法,但我怀疑它比使用ThreadPool更有效。
PS我不是100%清楚使用显示器的原因,因为你还在锁定? 请注意,问题只是因为我之前没有使用过Monitor类,而不是因为我实际上怀疑它是否正在使用。
使用Spring Threading。 它内置了Barrier实现。