为什么在Parallel.ForEach中每个线程多次调用localInit Func

我正在编写一些代码来处理大量数据,我认为让Parallel.ForEach为它创建的每个线程创建一个文件是有用的,因此输出不需要同步(至少由我来)。

它看起来像这样:

Parallel.ForEach(vals, new ParallelOptions { MaxDegreeOfParallelism = 8 }, ()=>GetWriter(), // returns a new BinaryWriter backed by a file with a guid name (item, state, writer)=> { if(something) { state.Break(); return writer; } List results = new List(); foreach(var subItem in item.SubItems) results.Add(ProcessItem(subItem)); if(results.Count > 0) { foreach(var result in results) result.Write(writer); } return writer; }, (writer)=>writer.Dispose()); 

我期望发生的是,最多可以创建8个文件并在整个运行时间内持续存在。 然后在整个ForEach调用结束时,每个都将被处理。 真正发生的是localInit似乎每个项目都被调用一次,所以我最终得到了数百个文件。 作者也被处理在每个处理项目的末尾。

这表明发生了同样的事情:

 var vals = Enumerable.Range(0, 10000000).ToArray(); long sum = 0; Parallel.ForEach(vals, new ParallelOptions { MaxDegreeOfParallelism = 8 }, () => { Console.WriteLine("init " + Thread.CurrentThread.ManagedThreadId); return 0L; }, (i, state, common) => { Thread.Sleep(10); return common + i; }, (common) => Interlocked.Add(ref sum, common)); 

我知道了:

 init 10 init 14 init 11 init 13 init 12 init 14 init 11 init 12 init 13 init 11 ... // hundreds of lines over < 30 seconds init 14 init 11 init 18 init 17 init 10 init 11 init 14 init 11 init 14 init 11 init 18 

注意:如果我省略Thread.Sleep调用,它有时似乎“正确”运行。 对于它决定在我的电脑上使用的4个线程,localInit只被调用一次。 然而,并非每次都如此。

这是函数的期望行为吗? 幕后发生了什么导致它这样做? 最后,什么是获得我想要的function的好方法,ThreadLocal?

顺便说一句,这是在.NET 4.5上。

Parallel.ForEach不像你想象的那样工作。 重要的是要注意该方法是在Task类之上构建的,并且TaskThread之间的关系不是1:1 。 例如,您可以拥有在2个托管线程上运行的10个任务。

尝试在方法体中使用此行而不是当前行:

 Console.WriteLine("ThreadId {0} -- TaskId {1} ", Thread.CurrentThread.ManagedThreadId, Task.CurrentId); 

您应该看到ThreadId将在许多不同的任务中重用,由其唯一的ID显示。 如果你离开或增加了对Thread.Sleep的调用,你会看到更多。

关于Parallel.ForEach方法如何工作的(非常)基本思想是,它使您的枚举创建了一系列将运行枚举的过程部分的任务,这样做的方式很大程度上取决于输入。 还有一些特殊的逻辑可以检查任务超过一定毫秒数而不完成的情况。 如果这种情况属实,则可能会产生新任务以帮助减轻工作。

如果您查看Parallel.ForEach localinit函数的文档,您会注意到它returns the initial state of the local data for each _task_ ,而不是每个线程

您可能会问为什么生成的任务超过8个。 答案类似于最后一个,在ParallelOptions.MaxDegreeOfParallelism的文档中找到。

从默认值更改MaxDegreeOfParallelism仅限制将使用多少并发任务。

此限制仅适用于并发任务的数量,而不是对其在整个处理期间创建的任务数量的硬限制。 正如我上面提到的,有时会生成一个单独的任务,这会导致您的localinit函数被多次调用并将数百个文件写入磁盘。

写入磁盘肯定是一种具有一点延迟的操作,特别是如果您使用的是同步I / O. 当磁盘操作发生时,它会阻塞整个线程; Thread.Sleep 。 如果Task执行此操作,它将阻止当前运行的线程,并且不能在其上运行任何其他任务。 通常在这些情况下,调度程序将生成一个新的Task来帮助解决这个问题。

最后,什么是获得我想要的function的好方法,ThreadLocal?

最重要的是,线程本地对Parallel.ForEach没有意义,因为你没有处理线程; 你正在处理任务。 可以在任务之间共享本地线程,因为许多任务可以同时使用同一个线程。 此外,任务的线程本地可以改变执行中期,因为调度程序可以抢占它,然后继续执行不同的线程,这将在本地具有不同的线程。

我不确定最好的方法,但你可以依靠localinit函数传入你想要的任何资源,只允许一次在一个线程中使用资源。 您可以使用localfinally将其标记为不再使用,因此可用于获取其他任务。 这就是那些方法的设计目的; 每个方法仅在每个生成的任务中调用一次(请参阅Parallel.ForEach MSDN文档的备注部分)。

您也可以自己拆分工作,创建自己的一组线程并运行您的工作。 但是,在我看来,这不是一个想法,因为Parallel类已经为你做了这么繁重的工作。

您所看到的是尝试尽快完成工作的实施。

为此,它尝试使用不同数量的任务来最大化吞吐量。 它从线程池中获取一定数量的线程并运行您的工作。 然后它尝试添加和删除线程以查看发生的情况。 它会继续这样做,直到你的所有工作完成。

该算法非常愚蠢,因为它不知道你的工作是使用了大量的CPU,还是大量的IO,或者即使有很多同步并且线程相互阻塞。 它所能做的就是添加和删除线程,并测量每个工作单元的完成速度。

这意味着它会在注入和退出线程时不断调用localInitlocalFinally函数 – 这就是你所发现的。

不幸的是,没有简单的方法来控制这种算法。 Parallel.ForEach是一个高级构造,故意隐藏大部分线程管理代码。


使用ThreadLocal可能会有所帮助,但它依赖于当Parallel.ForEach请求新线程时线程池将重用相同的线程这一事实。 这不是保证 – 实际上,线程池不可能完全使用8个线程进行整个调用。 这意味着您将再次创建超出必要的文件。


有一点可以保证, Parallel.ForEach在任何时候都不会使用超过MaxDegreeOfParallelism线程。

您可以通过创建固定大小的“池”文件来使用它,这些文件可以在特定时间运行的任何线程重用。 您知道只有MaxDegreeOfParallelism线程可以一次运行,因此您可以在调用ForEach之前创建该数量的文件。 然后在localInit抓取一个并在localInit释放它。

当然,你必须自己编写这个池,它必须是线程安全的,因为它将被同时调用。 但是,一个简单的锁定策略应该足够好,因为与锁的成本相比,线程不会非常快速地注入和退出。

根据MSDN , localInit方法为每个任务调用一次,而不是为每个线程调用:

对参与循环执行的每个任务调用一次localInit委托,并为每个任务返回初始本地状态。

在线程创建时调用localInit。 如果body需要这么长时间,它必须创建另一个线程并挂起当前线程,如果它创建另一个线程,则调用localInit

当Parallel.ForEach调用它时,也会创建与MaxDegreeOfParallelism值一样多的线程,例如:

 var k = Enumerable.Range(0, 1); Parallel.ForEach(k,new ParallelOptions(){MaxDegreeOfParallelism = 4}..... 

它首先调用时创建4个线程