为什么Parallel.ForEach比AsParallel()快得多.ForAll()尽管MSDN建议不然?

我一直在做一些调查,看看我们如何创建一个贯穿树的multithreading应用程序。

为了找到如何以最佳方式实现这一点,我创建了一个运行在我的C:\ disk中的测试应用程序并打开所有目录。

class Program { static void Main(string[] args) { //var startDirectory = @"C:\The folder\RecursiveFolder"; var startDirectory = @"C:\"; var w = Stopwatch.StartNew(); ThisIsARecursiveFunction(startDirectory); Console.WriteLine("Elapsed seconds: " + w.Elapsed.TotalSeconds); Console.ReadKey(); } public static void ThisIsARecursiveFunction(String currentDirectory) { var lastBit = Path.GetFileName(currentDirectory); var depth = currentDirectory.Count(t => t == '\\'); //Console.WriteLine(depth + ": " + currentDirectory); try { var children = Directory.GetDirectories(currentDirectory); //Edit this mode to switch what way of parallelization it should use int mode = 3; switch (mode) { case 1: foreach (var child in children) { ThisIsARecursiveFunction(child); } break; case 2: children.AsParallel().ForAll(t => { ThisIsARecursiveFunction(t); }); break; case 3: Parallel.ForEach(children, t => { ThisIsARecursiveFunction(t); }); break; default: break; } } catch (Exception eee) { //Exception might occur for directories that can't be accessed. } } } 

然而,我遇到的是,当在模式3(Parallel.ForEach)中运行时,代码在大约2.5秒内完成(是的,我有一个SSD;))。 在没有并行化的情况下运行代码,它在大约8秒内完成。 并且在模式2(AsParalle.ForAll())中运行代码需要几乎无限的时间。

在检查进程资源管理器时,我也遇到了一些奇怪的事实:

 Mode1 (No Parallelization): Cpu: ~25% Threads: 3 Time to complete: ~8 seconds Mode2 (AsParallel().ForAll()): Cpu: ~0% Threads: Increasing by one per second (I find this strange since it seems to be waiting on the other threads to complete or a second timeout.) Time to complete: 1 second per node so about 3 days??? Mode3 (Parallel.ForEach()): Cpu: 100% Threads: At most 29-30 Time to complete: ~2.5 seconds 

我发现特别奇怪的是,Parallel.ForEach似乎忽略了在AsParallel()时仍在运行的任何父线程/任务.ForAll()似乎等待前一个任务完成(这将不会很快,因为所有父任务仍在等待他们的孩子任务完成)。

我在MSDN上读到的内容也是:“在可能的情况下,我们会更喜欢每一个人”

资料来源: http : //msdn.microsoft.com/en-us/library/dd997403(v = vs1010).aspx

有没有人知道为什么会这样?

编辑1:

根据Matthew Watson的要求,我首先将树加载到内存中,然后再循环遍历它。 现在按顺序完成树的加载。

但结果是一样的。 Unparallelized和Parallel.ForEach现在在大约0.05秒内完成整个树,而AsParallel()。ForAll仍然只是每秒步进1步。

码:

 class Program { private static DirWithSubDirs RootDir; static void Main(string[] args) { //var startDirectory = @"C:\The folder\RecursiveFolder"; var startDirectory = @"C:\"; Console.WriteLine("Loading file system into memory..."); RootDir = new DirWithSubDirs(startDirectory); Console.WriteLine("Done"); var w = Stopwatch.StartNew(); ThisIsARecursiveFunctionInMemory(RootDir); Console.WriteLine("Elapsed seconds: " + w.Elapsed.TotalSeconds); Console.ReadKey(); } public static void ThisIsARecursiveFunctionInMemory(DirWithSubDirs currentDirectory) { var depth = currentDirectory.Path.Count(t => t == '\\'); Console.WriteLine(depth + ": " + currentDirectory.Path); var children = currentDirectory.SubDirs; //Edit this mode to switch what way of parallelization it should use int mode = 2; switch (mode) { case 1: foreach (var child in children) { ThisIsARecursiveFunctionInMemory(child); } break; case 2: children.AsParallel().ForAll(t => { ThisIsARecursiveFunctionInMemory(t); }); break; case 3: Parallel.ForEach(children, t => { ThisIsARecursiveFunctionInMemory(t); }); break; default: break; } } } class DirWithSubDirs { public List SubDirs = new List(); public String Path { get; private set; } public DirWithSubDirs(String path) { this.Path = path; try { SubDirs = Directory.GetDirectories(path).Select(t => new DirWithSubDirs(t)).ToList(); } catch (Exception eee) { //Ignore directories that can't be accessed } } } 

编辑2:

在阅读Matthew评论的更新后,我试图将以下代码添加到程序中:

 ThreadPool.SetMinThreads(4000, 16); ThreadPool.SetMaxThreads(4000, 16); 

然而,这并没有改变AsParallel的形状。 在减速到1步/秒之前,前8个步骤仍在执行。

(额外注意,我当前忽略了当我无法通过Directory.GetDirectories()周围的Try Catch块访问目录时发生的exception)

编辑3:

另外我最感兴趣的是Parallel.ForEach和AsParallel.ForAll之间的区别,因为对我而言,由于某种原因,第二个为每次递归创建一个Thread而第一个曾经处理大约30个线程中的所有内容最大。 (以及为什么MSDN建议使用AsParallel,即使它创建了这么multithreading,并且有超过1秒的超时)

编辑4:

我发现的另一个奇怪的事情是:当我尝试将线程池上的MinThreads设置为高于1023时,它似乎忽略了该值并缩回到8或16左右:ThreadPool.SetMinThreads(1023,16);

仍然当我使用1023时,它会非常快地完成前1023个元素,然后回到我一直在经历的缓慢节奏。

注意:现在还创建了超过1000个线程(相比整个Parallel.ForEach一个为30)。

这是否意味着Parallel.ForEach在处理任务方面更聪明?

更多信息,当您将值设置为1023时,此代码打印两次8 – 8 :(当您将值设置为1023或更低时,它会打印正确的值)

  int threadsMin; int completionMin; ThreadPool.GetMinThreads(out threadsMin, out completionMin); Console.WriteLine("Cur min threads: " + threadsMin + " and the other thing: " + completionMin); ThreadPool.SetMinThreads(1023, 16); ThreadPool.SetMaxThreads(1023, 16); ThreadPool.GetMinThreads(out threadsMin, out completionMin); Console.WriteLine("Now min threads: " + threadsMin + " and the other thing: " + completionMin); 

编辑5:

根据Dean的要求,我创建了另一个案例来手动创建任务:

 case 4: var taskList = new List(); foreach (var todo in children) { var itemTodo = todo; taskList.Add(Task.Run(() => ThisIsARecursiveFunctionInMemory(itemTodo))); } Task.WaitAll(taskList.ToArray()); break; 

这也和Parallel.ForEach()循环一样快。 所以我们仍然没有答案为什么AsParallel()。ForAll()是如此慢。

这个问题很可调试,当你遇到线程问题时,这种情况很不寻常。 这里的基本工具是Debug> Windows> Threads调试器窗口。 向您显示活动线程,并向您展示其堆栈跟踪。 你会很容易看到,一旦它变慢,你就会有几十个活跃的线程都被卡住了。 他们的堆栈跟踪看起来都一样:

  mscorlib.dll!System.Threading.Monitor.Wait(object obj, int millisecondsTimeout, bool exitContext) + 0x16 bytes mscorlib.dll!System.Threading.Monitor.Wait(object obj, int millisecondsTimeout) + 0x7 bytes mscorlib.dll!System.Threading.ManualResetEventSlim.Wait(int millisecondsTimeout, System.Threading.CancellationToken cancellationToken) + 0x182 bytes mscorlib.dll!System.Threading.Tasks.Task.SpinThenBlockingWait(int millisecondsTimeout, System.Threading.CancellationToken cancellationToken) + 0x93 bytes mscorlib.dll!System.Threading.Tasks.Task.InternalRunSynchronously(System.Threading.Tasks.TaskScheduler scheduler, bool waitForCompletion) + 0xba bytes mscorlib.dll!System.Threading.Tasks.Task.RunSynchronously(System.Threading.Tasks.TaskScheduler scheduler) + 0x13 bytes System.Core.dll!System.Linq.Parallel.SpoolingTask.SpoolForAll(System.Linq.Parallel.QueryTaskGroupState groupState, System.Linq.Parallel.PartitionedStream partitions, System.Threading.Tasks.TaskScheduler taskScheduler) Line 172 C# // etc.. 

每当你看到这样的东西时,你应该立即想到防火管的问题 。 在比赛和死锁之后,可能是第三个最常见的线程错误。

您可以推断,既然您知道原因,那么代码的问题在于每个完成的线程都会增加N个线程。 其中N是目录中的平均子目录数。 实际上,线程数呈指数增长 ,这总是很糟糕。 如果N = 1,它将只能保持控制,当然,这在典型的磁盘上永远不会发生。

请注意,就像几乎任何线程问题一样,这种不当行为往往会重演得很糟糕。 机器中的SSD往往会隐藏它。 因此,您的机器中的RAM也可以在第二次运行时快速完成并且无故障。 因为您现在将从文件系统缓存而不是磁盘读取,速度非常快。 修改ThreadPool.SetMinThreads()也隐藏它,但它无法修复它。 它永远不会修复任何问题,它只会隐藏它们。 因为无论发生什么,指数数字总是会超过设定的最小线程数。 您只能希望它在完成之前完成对驱动器的迭代。 对于拥有大驱动器的用户来说,空闲的希望。

ParallelEnumerable.ForAll()和Parallel.ForEach()之间的区别现在也许很容易解释。 你可以从堆栈跟踪中看出ForAll()做了一些顽皮的事情,RunSynchronously()方法会阻塞,直到完成所有线程。 阻塞是线程池线程不应该做的事情,它会使线程池变得粗糙并且不允许它为另一个作业安排处理器。 并且具有您观察到的效果,线程池很快就被等待N个其他线程完成的线程所淹没。 没有发生这种情况,他们正在游泳池中等待并且没有安排,因为他们已经有很多活跃的。

这是一个死锁场景,非常常见,但线程池管理器有一个解决方法。 它会监视活动的线程池线程,并在它们未及时完成时进入。 然后它允许一个额外的线程启动,比SetMinThreads()设置的最小线程多一个。 但是不超过SetMaxThreads()设置的最大值,有太多活动的tp线程是有风险的并且可能触发OOM。 这确实解决了死锁,它完成了一个ForAll()调用。 但是这种情况发生的速度非常慢,线程池每秒只执行两次。 在赶上之前你会没有耐心。

Parallel.ForEach()没有这个问题,它不会阻塞所以不会使池中的问题。

似乎是解决方案,但请记住,您的程序仍在消耗机器的内存,为池添加更多等待的tp线程。 这也可能导致程序崩溃,因为你拥有大量内存并且线程池不会使用大量内存来跟踪请求,所以它不太可能。 然而,一些程序员也完成了这一点 。

解决方案非常简单,只是不要使用线程。 它是有害的 ,只有一个磁盘时没有并发性。 而且它不喜欢被multithreading所征服。 在主轴驱动器上特别糟糕,头部搜索非常非常慢。 SSD可以做得更好,但它仍然需要50微秒,这是您不想要或不需要的开销。 访问磁盘的理想线程数是一个你不能期望缓存的磁盘。

首先要注意的是,您正在尝试并行化IO绑定操作,这将显着扭曲时序。

需要注意的第二件事是并行化任务的本质:您递归地降序目录树。 如果您创建多个线程来执行此操作,则每个线程可能同时访问磁盘的不同部分 – 这将导致磁盘读取头跳到整个位置并大大减慢速度。

尝试更改测试以创建内存中的树,并使用多个线程来访问它。 然后,您将能够正确地比较时间,而不会使结果失真超出所有有用性。

此外,您可能正在创建大量线程,并且它们(默认情况下)将是线程池线程。 拥有大量线程实际上会在超出处理器内核数量时降低速度。

另请注意,当超过线程池最小线程数(由ThreadPool.GetMinThreads()定义)时,线程池管理器会在每个新线程池线程创建之间引入延迟。 (我认为每个新主题大概是0.5秒)。

此外,如果线程数超过ThreadPool.GetMaxThreads()返回的值,则创建线程将阻塞,直到其他线程退出。 我想这可能会发生。

您可以通过调用ThreadPool.SetMaxThreads()ThreadPool.SetMinThreads()来增加这些值,并查看它是否有任何区别来测试此假设。

(最后请注意,如果您真的试图以递归方式从C:\下降,那么当它到达受保护的OS文件夹时,几乎肯定会遇到IOexception。)

注意:设置最大/最小线程池线程,如下所示:

 ThreadPool.SetMinThreads(4000, 16); ThreadPool.SetMaxThreads(4000, 16); 

跟进

我已尝试使用如上所述设置的线程池线程计数的测试代码,具有以下结果(不是在我的整个C:\驱动器上运行,而是在较小的子集上运行):

  • 模式1耗时06.5秒。
  • 模式2耗时15.7秒。
  • 模式3耗时16.4秒。

这符合我的期望; 添加一个线程加载实际上使它比单线程慢,并且两个并行方法大致相同的时间。


如果其他人想要研究这个,这里有一些确定性的测试代码(OP的代码不可重复,因为我们不知道他的目录结构)。

 using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Threading.Tasks; namespace Demo { internal class Program { private static DirWithSubDirs RootDir; private static void Main() { Console.WriteLine("Loading file system into memory..."); RootDir = new DirWithSubDirs("Root", 4, 4); Console.WriteLine("Done"); //ThreadPool.SetMinThreads(4000, 16); //ThreadPool.SetMaxThreads(4000, 16); var w = Stopwatch.StartNew(); ThisIsARecursiveFunctionInMemory(RootDir); Console.WriteLine("Elapsed seconds: " + w.Elapsed.TotalSeconds); Console.ReadKey(); } public static void ThisIsARecursiveFunctionInMemory(DirWithSubDirs currentDirectory) { var depth = currentDirectory.Path.Count(t => t == '\\'); Console.WriteLine(depth + ": " + currentDirectory.Path); var children = currentDirectory.SubDirs; //Edit this mode to switch what way of parallelization it should use int mode = 3; switch (mode) { case 1: foreach (var child in children) { ThisIsARecursiveFunctionInMemory(child); } break; case 2: children.AsParallel().ForAll(t => { ThisIsARecursiveFunctionInMemory(t); }); break; case 3: Parallel.ForEach(children, t => { ThisIsARecursiveFunctionInMemory(t); }); break; default: break; } } } internal class DirWithSubDirs { public List SubDirs = new List(); public String Path { get; private set; } public DirWithSubDirs(String path, int width, int depth) { this.Path = path; if (depth > 0) for (int i = 0; i < width; ++i) SubDirs.Add(new DirWithSubDirs(path + "\\" + i, width, depth - 1)); } } } 

Parallel.For和.ForEach方法在内部实现,等同于在Tasks中运行迭代,例如,循环如:

 Parallel.For(0, N, i => { DoWork(i); }); 

相当于:

 var tasks = new List(N); for(int i=0; i DoWork((int)state), i)); } Task.WaitAll(tasks.ToArray()); 

并且从每次迭代的角度来看,可能与其他迭代并行运行,这是一个好的心理模型,但不会发生在现实中。 事实上,并行不一定每次迭代使用一个任务,因为这比必要的开销要大得多。 Parallel.ForEach尝试尽可能快地使用完成循环所需的最少任务数。 当线程变得可用于处理这些任务时,它会旋转任务,并且每个任务都参与管理方案(我认为它称为分块):任务要求完成多次迭代,获取它们,然后处理工作,然后回去更多。 块大小根据参与的任务数量,机器上的负载等而变化。

PLINQ的.AsParallel()有不同的实现,但它“仍然可以”类似地将多次迭代提取到临时存储中,在线程中进行计算(但不是作为任务),并将查询结果放入一个小缓冲区。 (你得到一些基于ParallelQuery的东西,然后进一步.Whatever()函数绑定到另一组提供并行实现的扩展方法)。

现在我们对这两种机制的工作方式有了一个小小的想法,我将尽力回答你原来的问题:

那么为什么.AsParallel()比Parallel.ForEach慢 ? 原因源于以下几点。 任务(或其在此处的等效实现) 不会阻止类似I / O的调用。 他们’等待’并释放CPU以做其他事情。 但是(引用C#nutshell book):“ PLINQ无法在不阻塞线程的情况下执行I / O绑定工作 ”。 这些电话是同步的 。 编写它们的目的是为了增加并行度,如果(并且只是如果)您正在执行诸如每个不占用CPU时间的任务下载网页之类的事情。

你的函数调用完全类似于I / O绑定调用的原因是:你的一个线程(称之为T)阻塞并且在它的所有子线程完成之前什么也不做,这可能是一个缓慢的过程。 T本身不是CPU密集型,而是等待孩子们解锁, 除了等待之外什么都不做 。 因此,它与典型的I / O绑定函数调用相同。

根据AsParallel究竟如何工作的公认答案?

.AsParallel.ForAll()在调用.ForAll()之前强制转换回IEnumerable

所以它创建了1个新线程+ N个递归调用(每个调用生成一个新线程)。