任务MaxDegreeOfParallelism可以每次从我的列表中获取前n个对象吗?

我在我的函数中打开n并发线程:

 List _files = new List(); public void Start() { CancellationTokenSource _tokenSource = new CancellationTokenSource(); var token = _tokenSource.Token; Task.Factory.StartNew(() => { try { Parallel.ForEach(_files, new ParallelOptions { MaxDegreeOfParallelism = 5 //limit number of parallel threads }, file => { if (token.IsCancellationRequested) return; //do work... }); } catch (Exception) { } }, _tokenSource.Token).ContinueWith( t => { //finish... } , TaskScheduler.FromCurrentSynchronizationContext() //to ContinueWith (update UI) from UI thread ); } 

线程打开后,我注意到它从我的列表中选择了随机文件。 是否有可能每次从我的列表中选择前n元素?

要获得您想要的行为,您需要编写自定义分区程序 ,它看起来“随机”的原因是它现在正在以块为单位批处理文件列表,所以如果您的源列表是

 List files = List { "a", "b", "c", "d", "e", "f", "g", "h", "i" }; 

当它分区时,它可以像这样均匀地分割(如果Max是3个线程):

  • Thread1的工作清单:“a”,“b”,“c”
  • Thread2的工作清单:“d”,“e”,“f”
  • Thread3的工作清单:“g”,“h”,“i”

因此,如果你看过正在处理的文件,它可能看起来像

 "a", "d", "g", "e", "b", "h", "c", "f", "i" 

如果你制作一个自定义分区程序,你可以让它一次只取一个项目而不是一次批处理,使工作列表看起来像

  • Thread1的工作清单:“a”, GetTheNextUnprocessedString()
  • Thread2的工作清单:“b”, GetTheNextUnprocessedString()
  • Thread3的工作清单:“c”, GetTheNextUnprocessedString()

如果您使用的是.NET 4.5,则可以像这样使用此工厂 :

 Parallel.ForEach(Partitioner.Create(_files, EnumerablePartitionerOptions.NoBuffering), new ParallelOptions { MaxDegreeOfParallelism = 5 //limit number of parallel threads }, (file, loopstate, index) => { if (token.IsCancellationRequested) return; //do work... }); 

如果你不使用.NET 4.5,这不是一项微不足道的任务,所以我不打算在这里为你写。 阅读我在顶部链接的MSDN文章,您最终可以弄清楚。

我要做的是问自己“我真的需要按顺序处理文件吗?” 如果您不需要它们按顺序让它自己进行排序,因为您通过执行订单可能会做的唯一事情可能会减慢流程。

如果工作项以特定顺序启动很重要,那就不要依赖Parallel.ForEach ; 正如其他人所说,你可以根据需要进行配置,但这并不容易。

更容易的选择是创建5个不同的任务来处理项目。 它无法根据需要动态添加/删除工作人员,但无论如何您似乎都没有充分利用这一点。

只需创建一个BlockingCollection和5个从中获取项目的任务:

 var queue = new BlockingCollection(); int workers = 5; CancellationTokenSource cts = new CancellationTokenSource(); var tasks = new List(); for (int i = 0; i < workers; i++) { tasks.Add(Task.Run(() => { foreach (var item in queue.GetConsumingEnumerable()) { cts.Token.ThrowIfCancellationRequested(); DoWork(item); } }, cts.Token)); } //throw this into a new task if adding the items will take too long foreach (var item in data) queue.Add(item); queue.CompleteAdding(); Task.WhenAll(tasks).ContinueWith(t => { //do completion stuff }); 

当然文件是随机选择的,这就是parallel.foreach的重点。 如果你并行,你指定的5个线程将​​使用输入,因为它由数据partionier决定。

但是如果你真的想维护订单,请检查你可以为parallel.foreach指定的OrderablePartitioner。 – > http://msdn.microsoft.com/en-us/library/dd989583.aspx但当然这会降低性能,但它允许您指定如何为线程创建分区。