如何限制c#中的最大并行任务数

我有一个1000输入消息的集合来处理。 我正在循环输入集合并为每个消息启动新任务以进行处理。

//Assume this messages collection contains 1000 items var messages = new List(); foreach (var msg in messages) { Task.Factory.StartNew(() => { Process(msg); }); } 

我们可以猜测当时同时处理多少个最大消息(假设是普通的四核处理器),还是我们可以限制当时要处理的最大消息数?

如何确保以与Collection相同的顺序/顺序处理此消息?

在这种情况下,SemaphoreSlim是一个非常好的解决方案,我强烈建议OP尝试这个,但@ Manoj的答案有注释中提到的缺陷。在产生这样的任务之前应该等待。

更新后的答案:由于@Vasyl指出信号量可能在任务完成之前被释放,并且在调用Release()方法时会引发exception,因此在退出使用块之前必须等待所有创建的任务完成。

 int maxConcurrency=10; var messages = new List(); using(SemaphoreSlim concurrencySemaphore = new SemaphoreSlim(maxConcurrency)) { List tasks = new List(); foreach(var msg in messages) { concurrencySemaphore.Wait(); var t = Task.Factory.StartNew(() => { try { Process(msg); } finally { concurrencySemaphore.Release(); } }); tasks.Add(t); } Task.WaitAll(tasks.ToArray()); } 

如果没有Task.WaitAll在控制台应用程序中运行以下代码,那么想要查看信号量如何处理的人可以回答评论 ,并且会引发此exception。

System.ObjectDisposedException:’信号量已被释放。’

 static void Main(string[] args) { int maxConcurrency = 5; List messages = Enumerable.Range(1, 15).Select(e => e.ToString()).ToList(); using (SemaphoreSlim concurrencySemaphore = new SemaphoreSlim(maxConcurrency)) { List tasks = new List(); foreach (var msg in messages) { concurrencySemaphore.Wait(); var t = Task.Factory.StartNew(() => { try { Process(msg); } finally { concurrencySemaphore.Release(); } }); tasks.Add(t); } // Task.WaitAll(tasks.ToArray()); } Console.WriteLine("Exited using block"); Console.ReadKey(); } private static void Process(string msg) { Thread.Sleep(2000); Console.WriteLine(msg); } } 

您可以使用Parallel.Foreach并依赖MaxDegreeOfParallelism

 Parallel.ForEach(messages, new ParallelOptions {MaxDegreeOfParallelism = 10}, msg => { // logic Process(msg); }); 

您可以像这样简单地设置最大并发度:

 int maxConcurrency=10; var messages = new List<1000>(); using(SemaphoreSlim concurrencySemaphore = new SemaphoreSlim(maxConcurrency)) { foreach(var msg in messages) { Task.Factory.StartNew(() => { concurrencySemaphore.Wait(); try { Process(msg); } finally { concurrencySemaphore.Release(); } }); } } 

认为用户并行LINQ会更好

  Parallel.ForEach(messages , new ParallelOptions{MaxDegreeOfParallelism = 4}, x => Process(x); ); 

其中x是最大并行度

您可以创建自己的TaskScheduler并在那里覆盖QueueTask。

 protected virtual void QueueTask(Task task) 

然后你可以做任何你喜欢的事情。

这里有一个例子:

处理包装任务的有限并发级别任务调度程序(具有任务优先级)

如果您需要按顺序排队(处理可能以任何顺序完成),则不需要信号量。 如果陈述工作正常,那么老式:

  const int maxConcurrency = 5; List tasks = new List(); foreach (var arg in args) { var t = Task.Run(() => { Process(arg); } ); tasks.Add(t); if(tasks.Count >= maxConcurrency) Task.WaitAny(tasks.ToArray()); } Task.WaitAll(tasks.ToArray());