c#Threadpool – 限制线程数

我正在开发一个控制台应用程序

我想使用Threadpool来执行Web下载。 这是一些假代码。

for (int loop=0; loop< 100; loop++) { ThreadPool.QueueUserWorkItem(new WaitCallback(GetPage), pageList[loop]); } snip private static void GetPage(object o) { //get the page } 

如何阻止我的代码启动两个以上(或十个或其他)同步线程?

我试过了

  ThreadPool.SetMaxThreads(1, 0); ThreadPool.SetMinThreads(1, 0); 

但他们似乎没有任何影响。

我会使用Parallel.For并相应地设置MaxDegreeOfParallelism

 Parallel.For(0, 1000, new ParallelOptions { MaxDegreeOfParallelism = 10 }, i => { GetPage(pageList[i]); }); 

只需反转以下代码:

 ThreadPool.SetMaxThreads(1, 0); ThreadPool.SetMinThreads(1, 0); 

至:

 ThreadPool.SetMinThreads(1, 0); ThreadPool.SetMaxThreads(1, 0); 

您不能将MaxThread设置为小于MinThread

就个人而言,我会使用SmartThreadPool并单独保留ThreadPool。 但是,这可能是您想要的: C#线程池限制线程

链接中包含的代码(请给原作者信用,不是我)

 System.Threading.Semaphore S = new System.Threading.Semaphore(3, 3); try { // wait your turn (decrement) S.WaitOne(); // do your thing } finally { // release so others can go (increment) S.Release(); } 

描述

您可以使用ThreadPool.SetMaxThreads方法执行此操作。

但是使用ThreadPool for WebRequest存在一些问题。 例如,阅读(ThreadPool或HttpWebRequest中的错误?)

样品

 ThreadPool.SetMaxThreads(2,2); 

编辑

就个人而言,我会使用Linq AsParallel。

更多信息

  • MSDN – ThreadPool.SetMaxThreads方法
  • MSDN – ParallelEnumerable.AsParallel方法

查看ThreadPool.SetMaxThreads的参数。 第一个参数是工作线程的数量,第二个参数是异步线程的数量,这是您正在谈论的那个。

在文档的下面,它说:

您不能将工作线程数或I / O完成线程数设置为小于计算机中处理器数的数字。

听起来你正在尝试将ThreadPool用于不打算用于的东西。 如果您想限制下载量,请创建一个为您管理此类的类,因为ThreadPool不一定是您问题的完整解决方案。

我建议一个类在ThreadPool中启动两个线程并等待回调。 当它收到一个回调完成其中一个线程队列一个新线程。

如果你收紧到.Net 2.0,你可以使用以下技术:

知道你将一个任务排入ThreadPool的事实,它将创建一个新线程(当然,如果没有免费的线程),你会等到这之前有一个免费的线程。 为此目的,使用BlockingCounter类(如下所述),一旦达到限制,将等待递增,直到有人(另一个线程)递减它为止。 然后它进入“关闭”状态,表示不会进行新的增量并等待完成。

下面的示例显示最多4个任务,总数为10。

 class Program { static int s_numCurrentThreads = 0; static Random s_rnd = new Random(); static void Main(string[] args) { int maxParallelTasks = 4; int totalTasks = 10; using (BlockingCounter blockingCounter = new BlockingCounter(maxParallelTasks)) { for (int i = 1; i <= totalTasks; i++) { Console.WriteLine("Submitting task {0}", i); blockingCounter.WaitableIncrement(); if (!ThreadPool.QueueUserWorkItem((obj) => { try { ThreadProc(obj); } catch (Exception ex) { Console.Error.WriteLine("Task {0} failed: {1}", obj, ex.Message); } finally { // Exceptions are possible here too, // but proper error handling is not the goal of this sample blockingCounter.WaitableDecrement(); } }, i)) { blockingCounter.WaitableDecrement(); Console.Error.WriteLine("Failed to submit task {0} for execution.", i); } } Console.WriteLine("Waiting for copmletion..."); blockingCounter.CloseAndWait(30000); } Console.WriteLine("Work done!"); Console.ReadKey(); } static void ThreadProc (object obj) { int taskNumber = (int) obj; int numThreads = Interlocked.Increment(ref s_numCurrentThreads); Console.WriteLine("Task {0} started. Total: {1}", taskNumber, numThreads); int sleepTime = s_rnd.Next(0, 5); Thread.Sleep(sleepTime * 1000); Console.WriteLine("Task {0} finished.", taskNumber); Interlocked.Decrement(ref s_numCurrentThreads); } 

它使用BlockingCounter类,该类基于此处发布的Marc Gravell的SizeQueue,但没有计数器而不是队列。 当您结束排队新线程时,请调用Close()方法,然后等待它完成。

 public class BlockingCounter : IDisposable { private int m_Count; private object m_counterLock = new object(); private bool m_isClosed = false; private volatile bool m_isDisposed = false; private int m_MaxSize = 0; private ManualResetEvent m_Finished = new ManualResetEvent(false); public BlockingCounter(int maxSize = 0) { if (maxSize < 0) throw new ArgumentOutOfRangeException("maxSize"); m_MaxSize = maxSize; } public void WaitableIncrement(int timeoutMs = Timeout.Infinite) { lock (m_counterLock) { while (m_MaxSize > 0 && m_Count >= m_MaxSize) { CheckClosedOrDisposed(); if (!Monitor.Wait(m_counterLock, timeoutMs)) throw new TimeoutException("Failed to wait for counter to decrement."); } CheckClosedOrDisposed(); m_Count++; if (m_Count == 1) { Monitor.PulseAll(m_counterLock); } } } public void WaitableDecrement(int timeoutMs = Timeout.Infinite) { lock (m_counterLock) { try { while (m_Count == 0) { CheckClosedOrDisposed(); if (!Monitor.Wait(m_counterLock, timeoutMs)) throw new TimeoutException("Failed to wait for counter to increment."); } CheckDisposed(); m_Count--; if (m_MaxSize == 0 || m_Count == m_MaxSize - 1) Monitor.PulseAll(m_counterLock); } finally { if (m_isClosed && m_Count == 0) m_Finished.Set(); } } } void CheckClosedOrDisposed() { if (m_isClosed) throw new Exception("The counter is closed"); CheckDisposed(); } void CheckDisposed() { if (m_isDisposed) throw new ObjectDisposedException("The counter has been disposed."); } public void Close() { lock (m_counterLock) { CheckDisposed(); m_isClosed = true; Monitor.PulseAll(m_counterLock); } } public bool WaitForFinish(int timeoutMs = Timeout.Infinite) { CheckDisposed(); lock (m_counterLock) { if (m_Count == 0) return true; } return m_Finished.WaitOne(timeoutMs); } public void CloseAndWait (int timeoutMs = Timeout.Infinite) { Close(); WaitForFinish(timeoutMs); } public void Dispose() { if (!m_isDisposed) { m_isDisposed = true; lock (m_counterLock) { // Wake up all waiting threads, so that they know the object // is disposed and there's nothing to wait anymore Monitor.PulseAll(m_counterLock); } m_Finished.Close(); } } } 

结果将是这样的:

 Submitting task 1 Submitting task 2 Submitting task 3 Submitting task 4 Submitting task 5 Task 1 started. Total: 1 Task 1 finished. Task 3 started. Total: 1 Submitting task 6 Task 2 started. Total: 2 Task 3 finished. Task 6 started. Total: 4 Task 5 started. Total: 3 Task 4 started. Total: 4 Submitting task 7 Task 4 finished. Submitting task 8 Task 7 started. Total: 4 Task 5 finished. Submitting task 9 Task 7 finished. Task 8 started. Total: 4 Task 9 started. Total: 4 Submitting task 10 Task 2 finished. Waiting for copmletion... Task 10 started. Total: 4 Task 10 finished. Task 6 finished. Task 8 finished. Task 9 finished. Work done!