如何限制多个异步任务?
我有一些以下forms的代码:
static async Task DoSomething(int n) { ... } static void RunThreads(int totalThreads, int throttle) { var tasks = new List(); for (var n = 0; n < totalThreads; n++) { var task = DoSomething(n); tasks.Add(task); } Task.WhenAll(tasks).Wait(); // all threads must complete }
麻烦的是,如果我没有限制线程,事情开始崩溃。 现在,我想启动最多的throttle
线程,并且只在旧线程完成时启动新线程。 我尝试过几种方法,迄今为止没有一种方法可行。 我遇到的问题包括:
-
tasks
集合必须完全填充所有任务,无论是活动还是等待执行,否则最终的.Wait()
调用只会查看它所启动的线程。 - 链接执行似乎需要使用
Task.Run()
等。 但是我需要从一开始就引用每个任务,并且实例化任务似乎会自动启动它,这是我不想要的。
这该怎么做?
微软的Reactive Extensions(Rx) – NuGet“Rx-Main” – 这个问题排序非常好。
这样做:
static void RunThreads(int totalThreads, int throttle) { Observable .Range(0, totalThreads) .Select(n => Observable.FromAsync(() => DoSomething(n))) .Merge(throttle) .Wait(); }
任务完成。
最简单的选择IMO是使用TPL Dataflow。 您只需创建一个ActionBLock
,按所需的并行度限制它并开始将项目发布到其中。 它确保只同时运行一定数量的任务,当任务完成时,它开始执行下一个项目:
async Task RunAsync(int totalThreads, int throttle) { var block = new ActionBlock( DoSomething, new ExecutionDataFlowOptions { MaxDegreeOfParallelism = throttle }); for (var n = 0; n < totalThreads; n++) { block.Post(n); } block.Complete(); await block.Completion; }
Stephen Toub在他的基于任务的异步模式文档中提供了以下关于限制的示例。
const int CONCURRENCY_LEVEL = 15; Uri [] urls = …; int nextIndex = 0; var imageTasks = new List>(); while(nextIndex < CONCURRENCY_LEVEL && nextIndex < urls.Length) { imageTasks.Add(GetBitmapAsync(urls[nextIndex])); nextIndex++; } while(imageTasks.Count > 0) { try { Task imageTask = await Task.WhenAny(imageTasks); imageTasks.Remove(imageTask); Bitmap image = await imageTask; panel.AddImage(image); } catch(Exception exc) { Log(exc); } if (nextIndex < urls.Length) { imageTasks.Add(GetBitmapAsync(urls[nextIndex])); nextIndex++; } }
如果我理解正确,你可以启动任务由throttle
参数提到的有限数量的任务,并等待它们完成后再开始..
要在开始新任务之前等待所有已启动的任务完成,请使用以下实现。
static async Task RunThreads(int totalThreads, int throttle) { var tasks = new List(); for (var n = 0; n < totalThreads; n++) { var task = DoSomething(n); tasks.Add(task); if (tasks.Count == throttle) { await Task.WhenAll(tasks); tasks.Clear(); } } await Task.WhenAll(tasks); // wait for remaining }
要在完成任务时添加任务,可以使用以下代码
static async Task RunThreads(int totalThreads, int throttle) { var tasks = new List(); for (var n = 0; n < totalThreads; n++) { var task = DoSomething(n); tasks.Add(task); if (tasks.Count == throttle) { var completed = await Task.WhenAny(tasks); tasks.Remove(completed); } } await Task.WhenAll(tasks); // all threads must complete }
以下是一些基于Sriram Sakthivel答案的扩展方法变体。
在用法示例中,对DoSomething
调用被包装在显式转换的闭包中以允许传递参数。
public static async Task RunMyThrottledTasks() { var myArgsSource = new[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 }; await myArgsSource .Select(a => (Func>)(() => DoSomething(a))) .Throttle(2); } public static async Task