Parallel ForEach的本地初始化如何工作?

我不确定在Parallel.ForEach中使用本地init函数,如msdn文章中所述: http : //msdn.microsoft.com/en-us/library/dd997393.aspx

Parallel.ForEach(nums, // source collection () => 0, // method to initialize the local variable (j, loop, subtotal) => // method invoked by the loop on each iteration { subtotal += nums[j]; //modify local variable return subtotal; // value to be passed to next iteration },... 

()=> 0如何初始化任何东西? 变量的名称是什么以及如何在循环逻辑中使用它?

参考Parallel.ForEach静态扩展方法的以下重载 :

 public static ParallelLoopResult ForEach( IEnumerable source, Func localInit, Func taskBody, Action localFinally ) 

在您的具体示例中

这条线:

 () => 0, // method to initialize the local variable 

只是一个lambda(匿名函数),它将返回常量整数零。 这个lambda作为localInit参数传递给Parallel.ForEach – 因为lambda返回一个整数,它的类型为Func ,类型TLocal可以由编译器推断为int (类似地, TSource可以从类型中推断出来)作为参数source传递的集合)

然后将返回值(0)作为第3个参数(命名为subtotal )传递给taskBody Func 。 这个(0)用于身体循环的初始种子:

 (j, loop, subtotal) => { subtotal += nums[j]; //modify local variable (Bad idea, see comment) return subtotal; // value to be passed to next iteration } 

第二个lambda(传递给taskBody )被称为N次,其中N是TPL分区器分配给该任务的项目数。

对第二个taskBody lambda的每次后续调用都将传递taskBody的新值,有效地计算此Task的运行部分总数。 在添加了分配给此任务的所有项目之后,将再次调用第三个和最后一个localFinally函数参数,再次传递taskBody返回的subtotal的最终值。 由于几个此类任务将并行运行,因此还需要最后一步将所有部分总计加到最终的“总计”总数中。 但是,因为多个并发任务(在不同的线程上)可以争用grandTotal变量,所以对它的更改以线程安全的方式完成是很重要的。

(我已经更改了MSDN变量的名称以使其更清晰)

 long grandTotal = 0; Parallel.ForEach(nums, // source collection () => 0, // method to initialize the local variable (j, loop, subtotal) => // method invoked by the loop on each iteration subtotal + nums[j], // value to be passed to next iteration subtotal // The final value of subtotal is passed to the localFinally function parameter (subtotal) => Interlocked.Add(ref grandTotal, subtotal) 

在MS示例中,修改任务主体内部的参数小计是一种不好的做法,并且是不必要的。 即代码subtotal += nums[j]; return subtotal; subtotal += nums[j]; return subtotal; 会更好,因为只return subtotal + nums[j]; 可以缩写为lambda速记投影(j, loop, subtotal) => subtotal + nums[j]

一般来说

Parallel.For / Parallel.ForEach的localInit / body / localFinally重载允许在任务执行taskBody迭代之前和之后(分别)运行每个任务初始化和清理代码。

(注意For range / Enumerable传递给并行的For / Foreach将被分区为IEnumerable<>批处理,每个都将被分配一个Task)

每个任务中localInit将被调用一次, body代码将被重复调用,每个项目批量调用一次( 0..N次),并且localFinally将在完成后调用一次。

此外,您可以通过localInit Func的通用TLocal返回值传递任务持续时间所需的任何状态(即到taskBodylocalFinally委托) – 我在下面调用了此变量taskLocals

“localInit”的常见用法:

  • 创建和初始化循环体所需的昂贵资源,如数据库连接或Web服务连接。
  • 保持任务 – 本地变量保持(无竞争)运行总计或集合
  • 如果你需要从localInit返回多个对象到taskBodylocalFinally ,你可以使用一个强类型的类,一个Tuple<,,>或者,如果你只使用lambdas作为localInit / taskBody / localFinally ,你也可以通过数据通过匿名类。 请注意,如果使用localInit的返回localInit在多个任务之间共享引用类型,则需要考虑此对象的线程安全性 – 不可变性更可取。

“localFinally”行动的常见用法:

  • 释放诸如taskLocals使用的IDisposables资源(例如数据库连接,文件句柄,Web服务客户端等)
  • 将每个任务完成的工作聚合/组合/减少回共享变量。 这些共享变量将被争用,因此线程安全是一个问题:
    • 例如Interlocked.Increment对原始类型如整数
    • 写入操作将需要lock或类似操作
    • 利用并发集合来节省时间和精力。

taskBody是循环操作的tight部分 – 您需要优化它以获得性能。

这是用评论的例子总结的最好的总结:

 public void MyParallelizedMethod() { // Shared variable. Not thread safe var itemCount = 0; Parallel.For(myEnumerable, // localInit - called once per Task. () => { // Local `task` variables have no contention // since each Task can never run by multiple threads concurrently var sqlConnection = new SqlConnection("connstring..."); sqlConnection.Open(); // This is the `task local` state we wish to carry for the duration of the task return new { Conn = sqlConnection, RunningTotal = 0 } }, // Task Body. Invoked once per item in the batch assigned to this task (item, loopState, taskLocals) => { // ... Do some fancy Sql work here on our task's independent connection using(var command = taskLocals.Conn.CreateCommand()) using(var reader = command.ExecuteReader(...)) { if (reader.Read()) { // No contention for `taskLocal` taskLocals.RunningTotal += Convert.ToInt32(reader["countOfItems"]); } } // The same type of our `taskLocal` param must be returned from the body return taskLocals; }, // LocalFinally called once per Task after body completes // Also takes the taskLocal (taskLocals) => { // Any cleanup work on our Task Locals (as you would do in a `finally` scope) if (taskLocals.Conn != null) taskLocals.Conn.Dispose(); // Do any reduce / aggregate / synchronisation work. // NB : There is contention here! Interlocked.Add(ref itemCount, taskLocals.RunningTotal); } 

还有更多例子:

每任务无竞争词典的示例

每任务数据库连接的示例

作为@Honza Brestan答案的延伸。 Parallel foreach将工作分成任务的方式也很重要,它会将几个循环迭代分组到一个任务中,因此实际上localInit()会在循环的每n次迭代中调用一次,并且可以同时启动多个组。

localInitlocalFinally是确保并行foreach循环可以将每个itteration的结果组合成单个结果,而无需在body指定lock语句,为此,必须提供所需值的初始化create( localInit )然后每个body itteration可以处理本地值,然后提供一种方法以线程安全的方式组合每个组( localFinally )的值。

如果您不需要localInit来同步任务,则可以使用lambda方法正常引用周围上下文中的值而不会出现任何问题。 有关使用localInit / Finally和使用本地值向下滚动到优化的更深入的教程,请参阅C#中的线程(Parallel.For和Parallel.ForEach) ,Joseph Albahari实际上是我所有线程的goto源代码。

您可以在正确的Parallel.ForEach重载中获得MSDN的提示。

对参与循环执行的每个线程调用localInit委托一次,并为每个任务返回初始本地状态。 这些初始状态将传递给每个任务的第一个正文调用。 然后,每个后续的主体调用都会返回一个可能已修改的状态值,该值将传递给下一个主体调用。

在您的示例中() => 0是一个只返回0的委托,因此该值用于每个任务的第一次迭代。

从我身边稍微简单一点的例子

 class Program { class Person { public int Id { get; set; } public string Name { get; set; } public int Age { get; set; } } static List GetPerson() => new List() { new Person() { Id = 0, Name = "Artur", Age = 26 }, new Person() { Id = 1, Name = "Edward", Age = 30 }, new Person() { Id = 2, Name = "Krzysiek", Age = 67 }, new Person() { Id = 3, Name = "Piotr", Age = 23 }, new Person() { Id = 4, Name = "Adam", Age = 11 }, }; static void Main(string[] args) { List persons = GetPerson(); int ageTotal = 0; Parallel.ForEach ( persons, () => 0, (person, loopState, subtotal) => subtotal + person.Age, (subtotal) => Interlocked.Add(ref ageTotal, subtotal) ); Console.WriteLine($"Age total: {ageTotal}"); Console.ReadKey(); } }