如何限制多个异步任务?

我有一些以下forms的代码:

static async Task DoSomething(int n) { ... } static void RunThreads(int totalThreads, int throttle) { var tasks = new List(); for (var n = 0; n < totalThreads; n++) { var task = DoSomething(n); tasks.Add(task); } Task.WhenAll(tasks).Wait(); // all threads must complete } 

麻烦的是,如果我没有限制线程,事情开始崩溃。 现在,我想启动最多的throttle线程,并且只在旧线程完成时启动新线程。 我尝试过几种方法,迄今为止没有一种方法可行。 我遇到的问题包括:

  • tasks集合必须完全填充所有任务,无论是活动还是等待执行,否则最终的.Wait()调用只会查看它所启动的线程。
  • 链接执行似乎需要使用Task.Run()等。 但是我需要从一开始就引用每个任务,并且实例化任务似乎会自动启动它,这是我不想要的。

这该怎么做?

微软的Reactive Extensions(Rx) – NuGet“Rx-Main” – 这个问题排序非常好。

这样做:

 static void RunThreads(int totalThreads, int throttle) { Observable .Range(0, totalThreads) .Select(n => Observable.FromAsync(() => DoSomething(n))) .Merge(throttle) .Wait(); } 

任务完成。

最简单的选择IMO是使用TPL Dataflow。 您只需创建一个ActionBLock ,按所需的并行度限制它并开始将项目发布到其中。 它确保只同时运行一定数量的任务,当任务完成时,它开始执行下一个项目:

 async Task RunAsync(int totalThreads, int throttle) { var block = new ActionBlock( DoSomething, new ExecutionDataFlowOptions { MaxDegreeOfParallelism = throttle }); for (var n = 0; n < totalThreads; n++) { block.Post(n); } block.Complete(); await block.Completion; } 

Stephen Toub在他的基于任务的异步模式文档中提供了以下关于限制的示例。

 const int CONCURRENCY_LEVEL = 15; Uri [] urls = …; int nextIndex = 0; var imageTasks = new List>(); while(nextIndex < CONCURRENCY_LEVEL && nextIndex < urls.Length) { imageTasks.Add(GetBitmapAsync(urls[nextIndex])); nextIndex++; } while(imageTasks.Count > 0) { try { Task imageTask = await Task.WhenAny(imageTasks); imageTasks.Remove(imageTask); Bitmap image = await imageTask; panel.AddImage(image); } catch(Exception exc) { Log(exc); } if (nextIndex < urls.Length) { imageTasks.Add(GetBitmapAsync(urls[nextIndex])); nextIndex++; } } 

如果我理解正确,你可以启动任务由throttle参数提到的有限数量的任务,并等待它们完成后再开始..

要在开始新任务之前等待所有已启动的任务完成,请使用以下实现。

 static async Task RunThreads(int totalThreads, int throttle) { var tasks = new List(); for (var n = 0; n < totalThreads; n++) { var task = DoSomething(n); tasks.Add(task); if (tasks.Count == throttle) { await Task.WhenAll(tasks); tasks.Clear(); } } await Task.WhenAll(tasks); // wait for remaining } 

要在完成任务时添加任务,可以使用以下代码

 static async Task RunThreads(int totalThreads, int throttle) { var tasks = new List(); for (var n = 0; n < totalThreads; n++) { var task = DoSomething(n); tasks.Add(task); if (tasks.Count == throttle) { var completed = await Task.WhenAny(tasks); tasks.Remove(completed); } } await Task.WhenAll(tasks); // all threads must complete } 

以下是一些基于Sriram Sakthivel答案的扩展方法变体。

在用法示例中,对DoSomething调用被包装在显式转换的闭包中以允许传递参数。

 public static async Task RunMyThrottledTasks() { var myArgsSource = new[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 }; await myArgsSource .Select(a => (Func>)(() => DoSomething(a))) .Throttle(2); } public static async Task DoSomething(int arg) { // Await some async calls that need arg.. // ..then return result async Task.. return new object(); } public static async Task> Throttle(IEnumerable>> toRun, int throttleTo) { var running = new List>(throttleTo); var completed = new List>(toRun.Count()); foreach(var taskToRun in toRun) { running.Add(taskToRun()); if(running.Count == throttleTo) { var comTask = await Task.WhenAny(running); running.Remove(comTask); completed.Add(comTask); } } return completed.Select(t => t.Result); } public static async Task Throttle(this IEnumerable> toRun, int throttleTo) { var running = new List(throttleTo); foreach(var taskToRun in toRun) { running.Add(taskToRun()); if(running.Count == throttleTo) { var comTask = await Task.WhenAny(running); running.Remove(comTask); } } }