Parallel.ForEach不断产生新线程

当我在我的程序中使用Parallel.ForEach时,我发现有些线程似乎永远不会完成。 实际上,它一直在反复产生新的线程,这种行为是我没想到的,绝对不想要的。

我能够使用以下代码重现此行为,就像我的“真实”程序一样,它们都使用处理器和内存(.NET 4.0代码):

public class Node { public Node Previous { get; private set; } public Node(Node previous) { Previous = previous; } } public class Program { public static void Main(string[] args) { DateTime startMoment = DateTime.Now; int concurrentThreads = 0; var jobs = Enumerable.Range(0, 2000); Parallel.ForEach(jobs, delegate(int jobNr) { Interlocked.Increment(ref concurrentThreads); int heavyness = jobNr % 9; //Give the processor and the garbage collector something to do... List nodes = new List(); Node current = null; for (int y = 0; y < 1024 * 1024 * heavyness; y++) { current = new Node(current); nodes.Add(current); } TimeSpan elapsed = DateTime.Now - startMoment; int threadsRemaining = Interlocked.Decrement(ref concurrentThreads); Console.WriteLine("[{0:mm\\:ss}] Job {1,4} complete. {2} threads remaining.", elapsed, jobNr, threadsRemaining); }); } } 

在我的四核上运行时,它最初启动时有4个并发线程,正如您所期望的那样。 但是,随着时间的推移,越来越多的线程被创建。 最终,该程序然后抛出OutOfMemoryException:

 [00:00] Job 0 complete. 3 threads remaining. [00:01] Job 1 complete. 4 threads remaining. [00:01] Job 2 complete. 4 threads remaining. [00:02] Job 3 complete. 4 threads remaining. [00:05] Job 9 complete. 5 threads remaining. [00:05] Job 4 complete. 5 threads remaining. [00:05] Job 5 complete. 5 threads remaining. [00:05] Job 10 complete. 5 threads remaining. [00:08] Job 11 complete. 5 threads remaining. [00:08] Job 6 complete. 5 threads remaining. ... [00:55] Job 67 complete. 7 threads remaining. [00:56] Job 81 complete. 8 threads remaining. ... [01:54] Job 107 complete. 11 threads remaining. [02:00] Job 121 complete. 12 threads remaining. .. [02:55] Job 115 complete. 19 threads remaining. [03:02] Job 166 complete. 21 threads remaining. ... [03:41] Job 113 complete. 28 threads remaining.  

上述实验的内存使用情况图如下:

处理器和内存使用量

截图是荷兰语;顶部代表处理器使用情况,底部代表内存使用情况。 )正如您所看到的,几乎每次垃圾收集器阻塞时都会产生一个新线程(可以看出)在内存使用的萧条)。

任何人都可以解释为什么会这样,我能做些什么呢? 我只是希望.NET停止生成新线程,并首先完成现有线程……

您可以通过使用MaxDegreeOfParallelism属性集指定ParallelOptions实例来限制创建的最大线程数:

 var jobs = Enumerable.Range(0, 2000); ParallelOptions po = new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount }; Parallel.ForEach(jobs, po, jobNr => { // ... }); 

至于为什么你会得到你正在观察的行为:默认情况下,TPL (它是PLINQ的基础)可以自由地猜测要使用的最佳线程数。 每当并行任务阻塞时,任务调度程序可以创建新线程以保持进度。 在你的情况下,阻塞可能会隐含发生; 例如,通过Console.WriteLine调用,或(在您观察时)垃圾回收期间。

从并发级别调整任务并行库(要使用多少线程?) :

由于TPL默认策略是每个处理器使用一个线程,我们可以得出结论,TPL最初假定任务的工作负载是~100%工作和0%等待,并且如果初始假设失败并且任务进入等待状态(即开始阻止) – TPL可以自由地添加线程。

您可能应该阅读一下任务调度程序的工作原理。

http://msdn.microsoft.com/en-us/library/ff963549.aspx (页面的后半部分)

“.NET线程池自动管理池中的工作线程数。它根据内置的启发式方法添加和删除线程..NET线程池有两个主要的注入线程机制:一个添加工作者的饥饿避免机制线程如果它看不到排队项目没有进展和爬山试探,试图在尽可能少的线程使用时最大化吞吐量。

避免饥饿的目的是防止僵局。 当工作线程等待同步事件时,可能会发生这种死锁,该同步事件只能由线程池的全局或本地队列中仍未处理的工作项来满足。 如果存在固定数量的工作线程,并且所有这些线程都被类似地阻止,则系统将无法进一步取得进一步进展。 添加新的工作线程可以解决问题。

爬山启发式的目标是在线程被I / O或其他停止处理器的等待条件阻塞时提高核心的利用率。 默认情况下,托管线程池每个核心有一个工作线程。 如果其中一个工作线程被阻塞,则核心可能未充分利用,具体取决于计算机的总体工作负载。 线程注入逻辑不区分被阻塞的线程和执行冗长的处理器密集型操作的线程。 因此,每当线程池的全局或本地队列包含挂起的工作项时,需要很长时间才能运行的活动工作项(超过半秒)可以触发创建新的线程池工作线程。

您可以将任务标记为LongRunning,但这会产生从线程池外部为其分配线程的副作用,这意味着无法内联任务。

请记住, ParallelFor将作为块给出的工作视为块,因此即使一个循环中的工作相当小,由外观调用的任务所完成的整体工作对于调度程序来说可能看起来更长。

大多数对GC及其自身的调用都没有阻塞(它在一个单独的线程上运行)但是如果你等待GC完成则会阻塞。 还要记住,GC正在重新排列内存,因此如果您在运行GC时尝试分配内存,则可能会产生一些副作用(和阻塞)。 我没有具体细节,但我知道PPL有一些专门用于并发内存管理的内存分配function。

查看代码的输出,似乎事情正在运行很多秒。 所以我看到线程注入并不奇怪。 但是我似乎记得默认的线程池大小大约是30个线程(可能取决于系统上的核心数)。 在你的代码分配之前,一个线程占用大约一MB的内存,所以我不清楚你为什么会在这里得到一个内存不足的例外。

我发布了后续问题“如何计算.NET应用程序中并发线程的数量?”

如果要直接计算线程,它们在Parallel.For()中的数量大多数((非常罕见且不显着地减少)仅增加并且在循环完成后不会释放。

使用,在发布和调试模式下检查此项

 ParallelOptions po = new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount }; 

没有

数字各不相同,但结论相同。

这是我正在使用的准备好的代码,如果有人想玩:

 using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Threading; using System.Threading.Tasks; namespace Edit4Posting { public class Node { public Node Previous { get; private set; } public Node(Node previous) { Previous = previous; } } public class Edit4Posting { public static void Main(string[] args) { int concurrentThreads = 0; int directThreadsCount = 0; int diagThreadCount = 0; var jobs = Enumerable.Range(0, 160); ParallelOptions po = new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount }; Parallel.ForEach(jobs, po, delegate(int jobNr) //Parallel.ForEach(jobs, delegate(int jobNr) { int threadsRemaining = Interlocked.Increment(ref concurrentThreads); int heavyness = jobNr % 9; //Give the processor and the garbage collector something to do... List nodes = new List(); Node current = null; //for (int y = 0; y < 1024 * 1024 * heavyness; y++) for (int y = 0; y < 1024 * 24 * heavyness; y++) { current = new Node(current); nodes.Add(current); } //******************************* directThreadsCount = Process.GetCurrentProcess().Threads.Count; //******************************* threadsRemaining = Interlocked.Decrement(ref concurrentThreads); Console.WriteLine("[Job {0} complete. {1} threads remaining but directThreadsCount == {2}", jobNr, threadsRemaining, directThreadsCount); }); Console.WriteLine("FINISHED"); Console.ReadLine(); } } }