限制任务并行库中的线程数

我有几百个文件需要上传到Azure Blob存储。
我想使用并行任务库。
但是,如何在foreach文件列表中运行所有100个线程上传,我如何限制它可以使用的最大线程数并并行完成作业。 还是自动平衡事物?

你根本不应该使用线程。 有一个基于Task的API,它自然是异步的: CloudBlockBlob.UploadFromFileAsync 。 与async/awaitSemaphoreSlim一起使用它可以限制并行上传的次数。

示例(未经测试):

 const MAX_PARALLEL_UPLOADS = 5; async Task UploadFiles() { var files = new List(); // ... add files to the list // init the blob block and // upload files asynchronously using (var blobBlock = new CloudBlockBlob(url, credentials)) using (var semaphore = new SemaphoreSlim(MAX_PARALLEL_UPLOADS)) { var tasks = files.Select(async(filename) => { await semaphore.WaitAsync(); try { await blobBlock.UploadFromFileAsync(filename, FileMode.Create); } finally { semaphore.Release(); } }).ToArray(); await Task.WhenAll(tasks); } } 

你尝试过使用MaxDegreeOfParallelism吗? 像这样:

 System.Threading.Tasks.Parallel.Invoke( new Tasks.ParallelOptions {MaxDegreeOfParallelism = 5 }, actionsArray) 

基本上,您将要为要上载的每个文件创建一个Action或Task,将它们放入List中,然后处理该列表,从而限制可以并行处理的数量。

我的博客文章展示了如何使用“任务”和“操作”执行此操作,并提供了一个示例项目,您可以下载并运行以查看两者的实际操作。

有了动作

如果使用Actions,则可以使用内置的.Net Parallel.Invoke函数。 在这里,我们将其限制为最多并行运行5个线程。

 var listOfActions = new List(); foreach (var file in files) { var localFile = file; // Note that we create the Task here, but do not start it. listOfTasks.Add(new Task(() => blobBlock.UploadFromFileAsync(localFile, FileMode.Create))); } var options = new ParallelOptions {MaxDegreeOfParallelism = 5}; Parallel.Invoke(options, listOfActions.ToArray()); 

但是,此选项不使用UploadFromFileAsync的异步特性,因此您可能希望使用下面的任务示例。

随着任务

使用任务时,没有内置function。 但是,您可以使用我在博客上提供的那个。

  ///  /// Starts the given tasks and waits for them to complete. This will run, at most, the specified number of tasks in parallel. /// NOTE: If one of the given tasks has already been started, an exception will be thrown. ///  /// The tasks to run. /// The maximum number of tasks to run in parallel. /// The cancellation token. public static async Task StartAndWaitAllThrottledAsync(IEnumerable tasksToRun, int maxTasksToRunInParallel, CancellationToken cancellationToken = new CancellationToken()) { await StartAndWaitAllThrottledAsync(tasksToRun, maxTasksToRunInParallel, -1, cancellationToken); } ///  /// Starts the given tasks and waits for them to complete. This will run the specified number of tasks in parallel. /// NOTE: If a timeout is reached before the Task completes, another Task may be started, potentially running more than the specified maximum allowed. /// NOTE: If one of the given tasks has already been started, an exception will be thrown. ///  /// The tasks to run. /// The maximum number of tasks to run in parallel. /// The maximum milliseconds we should allow the max tasks to run in parallel before allowing another task to start. Specify -1 to wait indefinitely. /// The cancellation token. public static async Task StartAndWaitAllThrottledAsync(IEnumerable tasksToRun, int maxTasksToRunInParallel, int timeoutInMilliseconds, CancellationToken cancellationToken = new CancellationToken()) { // Convert to a list of tasks so that we don't enumerate over it multiple times needlessly. var tasks = tasksToRun.ToList(); using (var throttler = new SemaphoreSlim(maxTasksToRunInParallel)) { var postTaskTasks = new List(); // Have each task notify the throttler when it completes so that it decrements the number of tasks currently running. tasks.ForEach(t => postTaskTasks.Add(t.ContinueWith(tsk => throttler.Release()))); // Start running each task. foreach (var task in tasks) { // Increment the number of tasks currently running and wait if too many are running. await throttler.WaitAsync(timeoutInMilliseconds, cancellationToken); cancellationToken.ThrowIfCancellationRequested(); task.Start(); } // Wait for all of the provided tasks to complete. // We wait on the list of "post" tasks instead of the original tasks, otherwise there is a potential race condition where the throttler's using block is exited before some Tasks have had their "post" action completed, which references the throttler, resulting in an exception due to accessing a disposed object. await Task.WhenAll(postTaskTasks.ToArray()); } } 

然后创建任务列表并调用函数让它们运行,一次最多同时执行5个,你可以这样做:

 var listOfTasks = new List(); foreach (var file in files) { var localFile = file; // Note that we create the Task here, but do not start it. listOfTasks.Add(new Task(async () => await blobBlock.UploadFromFileAsync(localFile, FileMode.Create))); } await Tasks.StartAndWaitAllThrottledAsync(listOfTasks, 5); 

你可以通过运行这个来找到:

 class Program { static void Main(string[] args) { var list = new List(); for (int i = 0; i < 100; i++) { list.Add(i); } var runningIndex = 0; Task.Factory.StartNew(() => Action(ref runningIndex)); Parallel.ForEach(list, i => { runningIndex ++; Console.WriteLine(i); Thread.Sleep(3000); }); Console.ReadKey(); } private static void Action(ref int number) { while (true) { Console.WriteLine("worked through {0}", number); Thread.Sleep(2900); } } } 

正如您所看到的,并行性的数量在开始时较小,变得更大,并且在最后变小。 所以肯定会有某种自动优化。