C#中的并行迭代?
有没有办法在C#中对并行枚举进行foreach
样式迭代? 对于可订阅列表,我知道可以使用常规for
循环迭代索引范围内的int,但出于多种原因我更喜欢foreach
。
奖励积分如果它在C#2.0中有效
简短的回答,没有。 foreach
一次只能使用一个可枚举的。
但是,如果将并行枚举组合成一个,则可以foreach
组合。 我不知道这样做的任何简单的内置方法,但以下应该工作(虽然我还没有测试过):
public IEnumerable Combine(params object[] sources) { foreach(var o in sources) { // Choose your own exception if(!(o is IEnumerable )) throw new Exception(); } var enums = sources.Select(s => ((IEnumerable )s).GetEnumerator()) .ToArray(); while(enums.All(e => e.MoveNext())) { yield return enums.Select(e => e.Current).ToArray(); } }
然后你可以foreach
返回的可枚举:
foreach(var v in Combine(en1, en2, en3)) { // Remembering that v is an array of the type contained in en1, // en2 and en3. }
.NET 4的BlockingCollection使这很容易。 创建一个BlockingCollection,在枚举方法中返回它的.GetConsumingEnumerable()。 然后foreach只是添加到阻塞集合。
例如
private BlockingCollection m_data = new BlockingCollection (); public IEnumerable GetData( IEnumerable> sources ) { Task.Factory.StartNew( () => ParallelGetData( sources ) ); return m_data.GetConsumingEnumerable(); } private void ParallelGetData( IEnumerable> sources ) { foreach( var source in sources ) { foreach( var item in source ) { m_data.Add( item ); }; } //Adding complete, the enumeration can stop now m_data.CompleteAdding(); }
希望这可以帮助。 顺便说一句,我昨晚发布了一篇关于这个的博客
安德烈
Zooba的答案很好,但您可能还想查看“如何一次迭代两个数组”的答案。
我从.NET4 Parallel库中编写了一个EachParallel()的实现。 它与.NET 3.5兼容: C#3.5中的并行ForEach循环用法:
string[] names = { "cartman", "stan", "kenny", "kyle" }; names.EachParallel(name => { try { Console.WriteLine(name); } catch { /* handle exception */ } });
执行:
/// /// Enumerates through each item in a list in parallel /// public static void EachParallel(this IEnumerable list, Action action) { // enumerate the list so it can't change during execution list = list.ToArray(); var count = list.Count(); if (count == 0) { return; } else if (count == 1) { // if there's only one element, just execute it action(list.First()); } else { // Launch each method in it's own thread const int MaxHandles = 64; for (var offset = 0; offset < list.Count() / MaxHandles; offset++) { // break up the list into 64-item chunks because of a limitiation // in WaitHandle var chunk = list.Skip(offset * MaxHandles).Take(MaxHandles); // Initialize the reset events to keep track of completed threads var resetEvents = new ManualResetEvent[chunk.Count()]; // spawn a thread for each item in the chunk int i = 0; foreach (var item in chunk) { resetEvents[i] = new ManualResetEvent(false); ThreadPool.QueueUserWorkItem(new WaitCallback((object data) => { int methodIndex = (int)((object[])data)[0]; // Execute the method and pass in the enumerated item action((T)((object[])data)[1]); // Tell the calling thread that we're done resetEvents[methodIndex].Set(); }), new object[] { i, item }); i++; } // Wait for all threads to execute WaitHandle.WaitAll(resetEvents); } } }
如果你想坚持基础 – 我以更简单的方式重写了当前接受的答案:
public static IEnumerable Combine (this IEnumerable> sources) { var enums = sources .Select (s => s.GetEnumerator ()) .ToArray (); while (enums.All (e => e.MoveNext ())) { yield return enums.Select (e => e.Current).ToArray (); } } public static IEnumerable Combine (params IEnumerable [] sources) { return sources.Combine (); }
这对你有用吗?
public static class Parallel { public static void ForEach(IEnumerable [] sources, Action action) { foreach (var enumerable in sources) { ThreadPool.QueueUserWorkItem(source => { foreach (var item in (IEnumerable )source) action(item); }, enumerable); } } } // sample usage: static void Main() { string[] s1 = { "1", "2", "3" }; string[] s2 = { "4", "5", "6" }; IEnumerable[] sources = { s1, s2 }; Parallel.ForEach(sources, s => Console.WriteLine(s)); Thread.Sleep(0); // allow background threads to work }
对于C#2.0,您需要将上面的lambda表达式转换为委托。
注意:此实用程序方法使用后台线程。 您可能希望将其修改为使用前台线程,并且您可能希望等到所有线程完成。 如果你这样做,我建议你创建sources.Length - 1
线程,并使用当前执行的线程作为最后一个(或第一个)源。
(我希望我可以在我的代码中包含等待线程完成,但我很抱歉我不知道该怎么做。我想你应该使用 一个 WaitHandle
Thread.Join()
。)