如何限制c#中的最大并行任务数
我有一个1000输入消息的集合来处理。 我正在循环输入集合并为每个消息启动新任务以进行处理。
//Assume this messages collection contains 1000 items var messages = new List(); foreach (var msg in messages) { Task.Factory.StartNew(() => { Process(msg); }); }
我们可以猜测当时同时处理多少个最大消息(假设是普通的四核处理器),还是我们可以限制当时要处理的最大消息数?
如何确保以与Collection相同的顺序/顺序处理此消息?
在这种情况下,SemaphoreSlim是一个非常好的解决方案,我强烈建议OP尝试这个,但@ Manoj的答案有注释中提到的缺陷。在产生这样的任务之前应该等待。
更新后的答案:由于@Vasyl指出信号量可能在任务完成之前被释放,并且在调用Release()方法时会引发exception,因此在退出使用块之前必须等待所有创建的任务完成。
int maxConcurrency=10; var messages = new List(); using(SemaphoreSlim concurrencySemaphore = new SemaphoreSlim(maxConcurrency)) { List tasks = new List (); foreach(var msg in messages) { concurrencySemaphore.Wait(); var t = Task.Factory.StartNew(() => { try { Process(msg); } finally { concurrencySemaphore.Release(); } }); tasks.Add(t); } Task.WaitAll(tasks.ToArray()); }
如果没有Task.WaitAll
在控制台应用程序中运行以下代码,那么想要查看信号量如何处理的人可以回答评论 ,并且会引发此exception。
System.ObjectDisposedException:’信号量已被释放。’
static void Main(string[] args) { int maxConcurrency = 5; List messages = Enumerable.Range(1, 15).Select(e => e.ToString()).ToList(); using (SemaphoreSlim concurrencySemaphore = new SemaphoreSlim(maxConcurrency)) { List tasks = new List (); foreach (var msg in messages) { concurrencySemaphore.Wait(); var t = Task.Factory.StartNew(() => { try { Process(msg); } finally { concurrencySemaphore.Release(); } }); tasks.Add(t); } // Task.WaitAll(tasks.ToArray()); } Console.WriteLine("Exited using block"); Console.ReadKey(); } private static void Process(string msg) { Thread.Sleep(2000); Console.WriteLine(msg); } }
您可以使用Parallel.Foreach
并依赖MaxDegreeOfParallelism
。
Parallel.ForEach(messages, new ParallelOptions {MaxDegreeOfParallelism = 10}, msg => { // logic Process(msg); });
您可以像这样简单地设置最大并发度:
int maxConcurrency=10; var messages = new List<1000>(); using(SemaphoreSlim concurrencySemaphore = new SemaphoreSlim(maxConcurrency)) { foreach(var msg in messages) { Task.Factory.StartNew(() => { concurrencySemaphore.Wait(); try { Process(msg); } finally { concurrencySemaphore.Release(); } }); } }
认为用户并行LINQ会更好
Parallel.ForEach(messages , new ParallelOptions{MaxDegreeOfParallelism = 4}, x => Process(x); );
其中x是最大并行度
您可以创建自己的TaskScheduler并在那里覆盖QueueTask。
protected virtual void QueueTask(Task task)
然后你可以做任何你喜欢的事情。
这里有一个例子:
处理包装任务的有限并发级别任务调度程序(具有任务优先级)
如果您需要按顺序排队(处理可能以任何顺序完成),则不需要信号量。 如果陈述工作正常,那么老式:
const int maxConcurrency = 5; List tasks = new List (); foreach (var arg in args) { var t = Task.Run(() => { Process(arg); } ); tasks.Add(t); if(tasks.Count >= maxConcurrency) Task.WaitAny(tasks.ToArray()); } Task.WaitAll(tasks.ToArray());