CPU绑定任务的并行化继续IO绑定

我正在试图找到一种很好的方法来进行大数据集处理的代码并行化,然后将结果数据导入RavenDb。

数据处理受CPU限制,数据库导入IO绑定。

我正在寻找一个解决方案,在Environment.ProcessorCount线程上并行处理。 然后,应将结果数据导入到x(简称10)池中的RavenDb中并行处理上述过程。

这里的主要内容是我希望在导入完成数据时继续处理,以便在等待导入完成时继续处理下一个数据集。

另一个问题是成功导入后需要丢弃每批的内存,因为私有工作内存可以轻松达到> 5GB。

下面的代码是我到目前为止所得到的。 请注意,它没有满足上面列出的并行化要求。

datasupplier.GetDataItems() .Partition(batchSize) .AsParallel() .WithDegreeOfParallelism(Environment.ProcessorCount) .ForAll(batch => { Task.Run(() => { ... } } 

GetDataItem生成分区为批处理数据集的可枚举数据项。 GetDataItem将产生约2,000,000个项目,每个项目平均大约0.3ms进行处理。

该项目在x64平台上运行在最新的.NET 4.5 RC上。

更新。

我当前的代码(见上文)将获取项目并批量分区。 每个批处理在八个线程上并行处理(i7上的Environment.ProcessorCount)。 处理速度慢,CPU限制和内存密集。 完成单个批处理后,将启动一个任务,将结果数据异步导入RavenDb。 批量导入作业本身是同步的,如下所示:

 using (var session = Store.OpenSession()) { foreach (var data in batch) { session.Store(data); } session.SaveChanges(); } 

这种方法存在一些问题:

  1. 每次批处理完成时,都会启动任务以运行导入作业。 我想限制并行运行的任务数量(例如,max 10)。 此外,即使启动了许多任务,它们似乎也不会并行运行。

  2. 内存分配是一个巨大的问题。 处理/导入批处理后,它似乎仍然保留在内存中。

我正在寻找解决上述问题的方法。 理想情况下我想:

  • 每个逻辑处理器一个线程执行繁重的处理批量数据。
  • 十个左右的并行线程等待已完成的批次导入RavenDb。
  • 将内存分配保持在最小值,这意味着在导入任务完成后取消分配批处理。
  • 不在其中一个线程上运行导入作业以进行批处理。 已完成批次的导入应与正在处理的下一批次并行运行。

 var batchSize = 10000; var bc = new BlockingCollection<List>(); var importTask = Task.Run(() => { bc.GetConsumingEnumerable() .AsParallel() .WithExecutionMode(ParallelExecutionMode.ForceParallelism) .WithMergeOptions(ParallelMergeOptions.NotBuffered) .ForAll(batch => { using (var session = Store.OpenSession()) { foreach (var i in batch) session.Store(i); session.SaveChanges(); } }); }); var processTask = Task.Run(() => { datasupplier.GetDataItems() .Partition(batchSize) .AsParallel() .WithDegreeOfParallelism(Environment.ProcessorCount) .ForAll(batch => { bc.Add(batch.Select(i => new Data() { ... }).ToList()); }); }); processTask.Wait(); bc.CompleteAdding(); importTask.Wait(); 

您的任务整体听起来像生产者 – 消费者工作流程。 您的批处理器是生产者,您的RavenDB数据“导入”是生产者输出的消费者。

考虑使用BlockingCollection作为批处理proccesors与数据库导入程序之间的连接。 一旦批处理器将完成的批次推送到阻塞集合中,db导入器将立即唤醒,并且当他们“赶上”并清空集合时将重新进入睡眠状态。

批处理器生产者可以全速运行并始终与处理先前完成的批处理的db导入器任务并行运行。 如果您担心批处理器可能远远超过数据库导入程序(b / c db导入需要比处理每个批处理长得多),您可以在阻塞集合上设置上限,以便生成器在添加时阻塞超出这个限制,让消费者有机会赶上。

但是你的一些评论令人担忧。 启动Task实例以与批处理异步执行db导入没有什么特别的错误。 任务!=线程。 创建新任务实例与创建新线程没有相同的巨大开销。

不要试图过于精确地控制线程。 即使您指定的核心数量与核心数量完全相同,也不会独占使用这些核心。 来自其他进程的数百个其他线程仍将安排在您的时间片之间。 使用Tasks指定逻辑工作单元,让TPL管理线程池。 让自己避免虚假控制感的沮丧。 ;>

在您的注释中,您指出您的任务似乎没有彼此异步运行(您如何确定?)并且在每个批处理完成后似乎没有释放内存。 我建议放弃一切,直到你先弄清楚这两个问题是什么。 你忘了在某处调用Dispose()吗? 您是否持有不必要地保持整个对象树存活的引用? 你在测量正确的东西吗? 并行任务是否由阻塞数据库或网络I / O序列化? 在解决这两个问题之前,你的并行计划是什么并不重要。

对于每个批次,您都要开始一项任务。 这意味着您的循环非常快速地完成。 它留下(批次数)任务,而不是你想要的。 你想要的(CPU数量)。

解决方案:不要为每个批次启动新任务。 for循环已经是并行的。

在回复您的评论时,这是一个改进版本:

 //this runs in parallel var processedBatches = datasupplier.GetDataItems() .Partition(batchSize) .AsParallel() .WithDegreeOfParallelism(Environment.ProcessorCount) .Select(x => ProcessCpuBound(x)); foreach (var batch in processedBatches) { PerformIOIntensiveWorkSingleThreadedly(batch); //this runs sequentially } 

我最近构建了类似的东西,我使用了Parallel.Foreach的Queue类vs List。 我发现太multithreading实际上减慢了速度,有一个最佳点。