并行地以顺序方式执行N个线程

我有一个应用程序,我有1个大文件的1000多个小部分。

我必须一次上传最多16个零件。

我使用.Net的Thread并行库。

我使用Parallel.For来划分多个部分并分配1个应该为每个部分执行的方法,并将DegreeOfParallelism设置为16。

我需要使用由不同部分上传生成的校验和值执行1方法,因此我必须设置某些机制,我必须等待所有部件上传说1000完成。 在TPL库中,我面临的问题是它是从1000中随机执行16个线程中的任何一个。

我想要一些机制,我可以在最初运行前16个线程,如果第一个或第二个或任何16个线程完成其任务,则应该启动第17个部分。

我怎样才能做到这一点?

在此处输入图像描述

这是手动执行此操作的方法。

你需要一个队列。 队列是待处理任务的序列。 你必须将它们出列并将它们放入工作任务列表中。 完成任务后,将其从工作任务列表中删除,然后从队列中取出另一个任务。 主线程控制此过程。 以下是如何执行此操作的示例。

对于测试,我使用了整数列表,但它应该适用于其他类型,因为它使用generics。

private static void Main() { Random r = new Random(); var items = Enumerable.Range(0, 100).Select(x => r.Next(100, 200)).ToList(); ParallelQueue(items, DoWork); } private static void ParallelQueue(List items, Action action) { Queue pending = new Queue(items); List working = new List(); while (pending.Count + working.Count != 0) { if (pending.Count != 0 && working.Count < 16) // Maximum tasks { var item = pending.Dequeue(); // get item from queue working.Add(Task.Run(() => action((T)item))); // run task } else { Task.WaitAny(working.ToArray()); working.RemoveAll(x => x.IsCompleted); // remove finished tasks } } } private static void DoWork(int i) // do your work here. { // this is just an example Task.Delay(i).Wait(); Console.WriteLine(i); } 

如果您遇到如何为自己实施DoWork的问题,请告诉我。 因为如果您更改方法签名,您可能需要进行一些更改。

更新

您也可以使用async await执行此操作而不阻塞主线程。

 private static void Main() { Random r = new Random(); var items = Enumerable.Range(0, 100).Select(x => r.Next(100, 200)).ToList(); Task t = ParallelQueue(items, DoWork); // able to do other things. t.Wait(); } private static async Task ParallelQueue(List items, Func func) { Queue pending = new Queue(items); List working = new List(); while (pending.Count + working.Count != 0) { if (working.Count < 16 && pending.Count != 0) { var item = pending.Dequeue(); working.Add(Task.Run(async () => await func((T)item))); } else { await Task.WhenAny(working); working.RemoveAll(x => x.IsCompleted); } } } private static async Task DoWork(int i) { await Task.Delay(i); } 

一个可能的候选者可以是TPL Dataflow 。 这是一个演示,它接收整数流并将它们打印到控制台。 您将MaxDegreeOfParallelism设置为您希望并行旋转的多个线程:

 void Main() { var actionBlock = new ActionBlock( i => Console.WriteLine(i), new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 16}); foreach (var i in Enumerable.Range(0, 200)) { actionBlock.Post(i); } } 

如果您想拥有多个生产者/消费者,这也可以很好地扩展。

 var workitems = ... /*eg Enumerable.Range(0, 1000000)*/; SingleItemPartitioner.Create(workitems) .AsParallel() .AsOrdered() .WithDegreeOfParallelism(16) .WithMergeOptions(ParallelMergeOptions.NotBuffered) .ForAll(i => { Thread.Slee(1000); Console.WriteLine(i); }); 

这应该是你所需要的。 我忘了这些方法是如何命名的……看看文档。

在睡眠1秒后打印到控制台进行测试(此示例代码执行此操作)。

另一个选择是使用BlockingCollection作为文件读取器线程和16个上传器线程之间的队列。 每个上传者线程都会循环使用阻塞集合,直到它完成为止。

并且,如果要限制队列中的内存消耗,可以设置阻塞集合的上限,以便在缓冲区达到容量时文件读取器线程将暂停。 这在您可能需要限制每个用户/ API调用使用的内存的服务器环境中特别有用。

 // Create a buffer of 4 chunks between the file reader and the senders BlockingCollection queue = new BlockingCollection(4); // Create a cancellation token source so you can stop this gracefully CancellationTokenSource cts = ... 

文件阅读器线程

 ... queue.Add(chunk, cts.Token); ... queue.CompleteAdding(); 

发送线程

 for(int i = 0; i < 16; i++) { Task.Run(() => { foreach (var chunk in queue.GetConsumingEnumerable(cts.Token)) { .. do the upload } }); }