如何并行运行一组函数并在完成后等待结果?

我需要在同一时间异步运行一组重函数,并在列表中填充结果。 这是伪代码:

List results = new List(); List<Func> tasks = PopulateTasks(); foreach(var task in tasks) { // Run Logic in question 1. Run each task asynchronously/parallely 2. Put the results in the results list upon each task completion } Console.WriteLine("All tasks completed and results populated"); 

我需要foreach bock里面的逻辑。 你们能帮助我吗?

我有一些约束:解决方案必须符合.net 3.5(不是.net 4,但是.net 4替代解决方案将因我的知识目的而受到赞赏)

提前致谢。

一个简单的3.5实现可能看起来像这样

 List results = new List(); List> tasks = PopulateTasks(); ManualResetEvent waitHandle = new ManualResetEvent(false); void RunTasks() { int i = 0; foreach(var task in tasks) { int captured = i++; ThreadPool.QueueUserWorkItem(state => RunTask(task, captured)) } waitHandle.WaitOne(); Console.WriteLine("All tasks completed and results populated"); } private int counter; private readonly object listLock = new object(); void RunTask(Func task, int index) { var res = task(...); //You haven't specified where the parameter comes from lock (listLock ) { results[index] = res; } if (InterLocked.Increment(ref counter) == tasks.Count) waitHandle.Set(); } 
 List> tasks = PopulateTasks(); TResult[] results = new TResult[tasks.Length]; Parallel.For(0, tasks.Count, i => { results[i] = tasks[i](); }); 

3.5的TPL显然存在 。

  public static IList RunAsync(IEnumerable> tasks) { List asyncContext = new List(); foreach (var task in tasks) { asyncContext.Add(task.BeginInvoke(null, null)); } return asyncContext; } public static IEnumerable WaitForAll(IEnumerable> tasks, IEnumerable asyncContext) { IEnumerator iterator = asyncContext.GetEnumerator(); foreach (var task in tasks) { iterator.MoveNext(); yield return task.EndInvoke(iterator.Current); } } public static void Main() { var tasks = Enumerable.Repeat>(() => ComputeValue(), 10).ToList(); var asyncContext = RunAsync(tasks); var results = WaitForAll(tasks, asyncContext); foreach (var result in results) { Console.WriteLine(result); } } public static int ComputeValue() { Thread.Sleep(1000); return Guid.NewGuid().ToByteArray().Sum(a => (int)a); } 

另一个变体是未来的小模式实现:

  public class Future { public Future(Func task) { Task = task; _asyncContext = task.BeginInvoke(null, null); } private IAsyncResult _asyncContext; public Func Task { get; private set; } public T Result { get { return Task.EndInvoke(_asyncContext); } } public bool IsCompleted { get { return _asyncContext.IsCompleted; } } } public static IList> RunAsync(IEnumerable> tasks) { List> asyncContext = new List>(); foreach (var task in tasks) { asyncContext.Add(new Future(task)); } return asyncContext; } public static IEnumerable WaitForAll(IEnumerable> futures) { foreach (var future in futures) { yield return future.Result; } } public static void Main() { var tasks = Enumerable.Repeat>(() => ComputeValue(), 10).ToList(); var futures = RunAsync(tasks); var results = WaitForAll(futures); foreach (var result in results) { Console.WriteLine(result); } } public static int ComputeValue() { Thread.Sleep(1000); return Guid.NewGuid().ToByteArray().Sum(a => (int)a); } 

传统的方式是使用Sempahore。 使用您正在使用的线程数初始化信号量,然后为每个任务启动一个线程并等待信号量对象。 当每个线程完成时,它应该增加信号量。 当信号量计数达到0时,正在等待的主线程将继续。

在单独的工作器实例中进行处理,每个实例都在自己的线程上。 使用回调来传回结果并通知调用进程线程已完成。 使用Dictionary来跟踪正在运行的线程。 如果你有很multithreading,你应该加载一个队列并启动新的线程。 在此示例中,在启动任何线程之前创建所有线程,以防止在最终线程启动之前运行线程计数降至零的竞争条件。

  Dictionary activeThreads = new Dictionary(); void LaunchWorkers() { foreach (var task in tasks) { Worker worker = new Worker(task, new WorkerDoneDelegate(ProcessResult)); Thread thread = new Thread(worker.Done); thread.IsBackground = true; activeThreads.Add(thread.ManagedThreadId, thread); } lock (activeThreads) { activeThreads.Values.ToList().ForEach(n => n.Start()); } } void ProcessResult(int threadId, TResult result) { lock (results) { results.Add(result); } lock (activeThreads) { activeThreads.Remove(threadId); // done when activeThreads.Count == 0 } } } public delegate void WorkerDoneDelegate(object results); class Worker { public WorkerDoneDelegate Done; public void Work(Task task, WorkerDoneDelegate Done) { // process task Done(Thread.CurrentThread.ManagedThreadId, result); } }