C#中的并行迭代?

有没有办法在C#中对并行枚举进行foreach样式迭代? 对于可订阅列表,我知道可以使用常规for循环迭代索引范围内的int,但出于多种原因我更喜欢foreach

奖励积分如果它在C#2.0中有效

简短的回答,没有。 foreach一次只能使用一个可枚举的。

但是,如果将并行枚举组合成一个,则可以foreach组合。 我不知道这样做的任何简单的内置方法,但以下应该工作(虽然我还没有测试过):

 public IEnumerable Combine(params object[] sources) { foreach(var o in sources) { // Choose your own exception if(!(o is IEnumerable)) throw new Exception(); } var enums = sources.Select(s => ((IEnumerable)s).GetEnumerator()) .ToArray(); while(enums.All(e => e.MoveNext())) { yield return enums.Select(e => e.Current).ToArray(); } } 

然后你可以foreach返回的可枚举:

 foreach(var v in Combine(en1, en2, en3)) { // Remembering that v is an array of the type contained in en1, // en2 and en3. } 

.NET 4的BlockingCollection使这很容易。 创建一个BlockingCollection,在枚举方法中返回它的.GetConsumingEnumerable()。 然后foreach只是添加到阻塞集合。

例如

 private BlockingCollection m_data = new BlockingCollection(); public IEnumerable GetData( IEnumerable> sources ) { Task.Factory.StartNew( () => ParallelGetData( sources ) ); return m_data.GetConsumingEnumerable(); } private void ParallelGetData( IEnumerable> sources ) { foreach( var source in sources ) { foreach( var item in source ) { m_data.Add( item ); }; } //Adding complete, the enumeration can stop now m_data.CompleteAdding(); } 

希望这可以帮助。 顺便说一句,我昨晚发布了一篇关于这个的博客

安德烈

Zooba的答案很好,但您可能还想查看“如何一次迭代两个数组”的答案。

我从.NET4 Parallel库中编写了一个EachParallel()的实现。 它与.NET 3.5兼容: C#3.5中的并行ForEach循环用法:

 string[] names = { "cartman", "stan", "kenny", "kyle" }; names.EachParallel(name => { try { Console.WriteLine(name); } catch { /* handle exception */ } }); 

执行:

 ///  /// Enumerates through each item in a list in parallel ///  public static void EachParallel(this IEnumerable list, Action action) { // enumerate the list so it can't change during execution list = list.ToArray(); var count = list.Count(); if (count == 0) { return; } else if (count == 1) { // if there's only one element, just execute it action(list.First()); } else { // Launch each method in it's own thread const int MaxHandles = 64; for (var offset = 0; offset < list.Count() / MaxHandles; offset++) { // break up the list into 64-item chunks because of a limitiation // in WaitHandle var chunk = list.Skip(offset * MaxHandles).Take(MaxHandles); // Initialize the reset events to keep track of completed threads var resetEvents = new ManualResetEvent[chunk.Count()]; // spawn a thread for each item in the chunk int i = 0; foreach (var item in chunk) { resetEvents[i] = new ManualResetEvent(false); ThreadPool.QueueUserWorkItem(new WaitCallback((object data) => { int methodIndex = (int)((object[])data)[0]; // Execute the method and pass in the enumerated item action((T)((object[])data)[1]); // Tell the calling thread that we're done resetEvents[methodIndex].Set(); }), new object[] { i, item }); i++; } // Wait for all threads to execute WaitHandle.WaitAll(resetEvents); } } } 

如果你想坚持基础 – 我以更简单的方式重写了当前接受的答案:

  public static IEnumerable Combine (this IEnumerable> sources) { var enums = sources .Select (s => s.GetEnumerator ()) .ToArray (); while (enums.All (e => e.MoveNext ())) { yield return enums.Select (e => e.Current).ToArray (); } } public static IEnumerable Combine (params IEnumerable[] sources) { return sources.Combine (); } 

这对你有用吗?

 public static class Parallel { public static void ForEach(IEnumerable[] sources, Action action) { foreach (var enumerable in sources) { ThreadPool.QueueUserWorkItem(source => { foreach (var item in (IEnumerable)source) action(item); }, enumerable); } } } // sample usage: static void Main() { string[] s1 = { "1", "2", "3" }; string[] s2 = { "4", "5", "6" }; IEnumerable[] sources = { s1, s2 }; Parallel.ForEach(sources, s => Console.WriteLine(s)); Thread.Sleep(0); // allow background threads to work } 

对于C#2.0,您需要将上面的lambda表达式转换为委托。

注意:此实用程序方法使用后台线程。 您可能希望将其修改为使用前台线程,并且您可能希望等到所有线程完成。 如果你这样做,我建议你创建sources.Length - 1线程,并使用当前执行的线程作为最后一个(或第一个)源。

(我希望我可以在我的代码中包含等待线程完成,但我很抱歉我不知道该怎么做。我想你应该使用 一个WaitHandle Thread.Join() 。)