如何依靠I / O大量正确地并行工作

我正在构建一个必须处理大量数据的控制台应用程序。

基本上,应用程序从数据库中获取引用。 对于每个引用,解析文件的内容并进行一些更改。 这些文件是HTML文件,并且该过程正在使用RegEx替换进行繁重的工作(查找引用并将它们转换为链接)。 然后将结果存储在文件系统中并发送到外部系统。

如果我按顺序恢复该过程:

var refs = GetReferencesFromDB(); // ~5000 Datarow returned foreach(var ref in refs) { var filePath = GetFilePath(ref); // This method looks up in a previously loaded file list var html = File.ReadAllText(filePath); // Read html locally, or from a network drive var convertedHtml = ParseHtml(html); File.WriteAllText(destinationFilePath); // Copy the result locally, or a network drive SendToWs(ref, convertedHtml); } 

我的程序工作正常,但速度很慢。 这就是我想要将这个过程并行化的原因。

到现在为止,我做了一个简单的并行化添加AsParallel:

 var refs = GetReferencesFromDB().AsParallel(); refs.ForAll(ref=> { var filePath = GetFilePath(ref); var html = File.ReadAllText(filePath); var convertedHtml = ParseHtml(html); File.WriteAllText(destinationFilePath); SendToWs(ref, convertedHtml); }); 

这种简单的改变减少了过程的持续时间(减少了25%的时间)。 但是,我对并行化的理解是,如果依赖于I / O的资源并行,那么将没有太多的好处(或者更糟的是,更少的好处),因为i / o不会神奇地加倍。

这就是为什么我认为我应该改变我的方法而不是并行化整个过程,而是创建依赖的链式排队任务。

IE,我应该创建一个流程:

队列读取文件。 完成后,Queue ParseHtml。 完成后,Queue都会发送到WS并在本地写入。 完成后,记录结果。

但是,我不知道如何实现这样的思考。

我觉得它会以一组消费者/生产者队列结束,但我找不到正确的样本。

而且,我不确定是否会有好处。

谢谢你的建议

[编辑]事实上,我是使用c#4.5的完美候选人…如果只是rtm 🙂

[编辑2]让我觉得它没有正确并行化的另一件事是,在资源监视器中,我看到CPU,网络I / O和磁盘I / O的图表不稳定。 当一个人高,其他人是低到中等

您没有在任何代码中利用任何异步I / O API。 你所做的一切都是CPU限制的,所有的I / O操作都会浪费CPU资源阻塞。 AsParallel用于计算绑定任务,如果要利用异步I / O,则需要在<= v4.0中利用基于异步编程模型(APM)的API。 这是通过在您正在使用的基于I / O的类上查找BeginXXX/EndXXX方法并在可用时利用这些方法来完成的。

阅读本文的初学者: TPL TaskFactory.FromAsync vs Tasks with blocking方法

接下来,无论如何你都不想在这种情况下使用AsParallelAsParallel启用流式传输,这将导致每个项目立即安排一个新任务,但您不需要/想要这里。 使用Parallel::ForEach对工作进行分区会更好。

让我们看看如何使用这些知识在特定情况下实现最大并发性:

 var refs = GetReferencesFromDB(); // Using Parallel::ForEach here will partition and process your data on separate worker threads Parallel.ForEach( refs, ref => { string filePath = GetFilePath(ref); byte[] fileDataBuffer = new byte[1048576]; // Need to use FileStream API directly so we can enable async I/O FileStream sourceFileStream = new FileStream( filePath, FileMode.Open, FileAccess.Read, FileShare.Read, 8192, true); // Use FromAsync to read the data from the file Task readSourceFileStreamTask = Task.Factory.FromAsync( sourceFileStream.BeginRead sourceFileStream.EndRead fileDataBuffer, fileDataBuffer.Length, null); // Add a continuation that will fire when the async read is completed readSourceFileStreamTask.ContinueWith(readSourceFileStreamAntecedent => { int soureFileStreamBytesRead; try { // Determine exactly how many bytes were read // NOTE: this will propagate any potential exception that may have occurred in EndRead sourceFileStreamBytesRead = readSourceFileStreamAntecedent.Result; } finally { // Always clean up the source stream sourceFileStream.Close(); sourceFileStream = null; } // This is here to make sure you don't end up trying to read files larger than this sample code can handle if(sourceFileStreamBytesRead == fileDataBuffer.Length) { throw new NotSupportedException("You need to implement reading files larger than 1MB. :P"); } // Convert the file data to a string string html = Encoding.UTF8.GetString(fileDataBuffer, 0, sourceFileStreamBytesRead); // Parse the HTML string convertedHtml = ParseHtml(html); // This is here to make sure you don't end up trying to write files larger than this sample code can handle if(Encoding.UTF8.GetByteCount > fileDataBuffer.Length) { throw new NotSupportedException("You need to implement writing files larger than 1MB. :P"); } // Convert the file data back to bytes for writing Encoding.UTF8.GetBytes(convertedHtml, 0, convertedHtml.Length, fileDataBuffer, 0); // Need to use FileStream API directly so we can enable async I/O FileStream destinationFileStream = new FileStream( destinationFilePath, FileMode.OpenOrCreate, FileAccess.Write, FileShare.None, 8192, true); // Use FromAsync to read the data from the file Task destinationFileStreamWriteTask = Task.Factory.FromAsync( destinationFileStream.BeginWrite, destinationFileStream.EndWrite, fileDataBuffer, 0, fileDataBuffer.Length, null); // Add a continuation that will fire when the async write is completed destinationFileStreamWriteTask.ContinueWith(destinationFileStreamWriteAntecedent => { try { // NOTE: we call wait here to observe any potential exceptions that might have occurred in EndWrite destinationFileStreamWriteAntecedent.Wait(); } finally { // Always close the destination file stream destinationFileStream.Close(); destinationFileStream = null; } }, TaskContinuationOptions.AttachedToParent); // Send to external system **concurrent** to writing to destination file system above SendToWs(ref, convertedHtml); }, TaskContinuationOptions.AttachedToParent); }); 

现在,这里有几点说明:

  1. 这是示例代码,所以我使用1MB缓冲区来读/写文件。 对于HTML文件来说这是过度的,浪费了系统资源。 您可以降低它以满足您的最大需求,或者将链式读/写实现到StringBuilder中,这是一个我留给您的练习,因为我要编写~500多行代码来执行异步链接读/写操作。 :P
  2. 您会注意到,在读/写任务的延续中,我有TaskContinuationOptions.AttachedToParent 。 这非常重要,因为它将阻止Parallel::ForEach启动工作的工作线程完成,直到完成所有底层异步调用。 如果不在这里,您将同时启动所有5000个项目的工作,这会使TPL子系统污染数千个计划任务并且根本无法正确扩展。
  3. 我调用SendToWs并将文件写入文件共享。 我不知道SendToWs的实现有什么潜在的基础,但它听起来像是一个很好的候选者来制作异步。 现在它假设它是纯粹的计算工作,因此,它将在执行时刻录CPU线程。 我把它作为一个练习让你弄清楚如何最好地利用我所展示的那些来提高那里的吞吐量。
  4. 这是所有类型的自由forms,我的大脑是这里唯一的编译器和SO的语法higlighting是我用来确保语法是好的。 所以,请原谅任何语法错误,让我知道如果我搞得太糟糕,你无法做出正面或反面,我会跟进。

好消息是您的逻辑可以很容易地分成生产者 – 消费者管道中的步骤。

  • 第1步:读取文件
  • 第2步:解析文件
  • 第3步:写入文件
  • 第4步:SendToWs

如果您使用的是.NET 4.0,则可以使用BlockingCollection数据结构作为每个步骤的生产者 – 使用者队列的主干。 主线程将每个工作项排入第1步的队列,在那里它将被拾取和处理,然后转发到第2步的队列,依此类推。

如果您愿意继续使用Async CTP,那么您也可以利用新的TPL Dataflow结构。 除其他之外,还有BufferBlock数据结构,其行为方式与BlockingCollection类似,并与新的asyncawait关键字很好地集成。

由于您的算法是IO绑定的,因此生产者 – 消费者策略可能无法为您提供所需的性能提升,但至少您将拥有一个非常优雅的解决方案,如果您可以提高IO吞吐量,该解决方案可以很好地扩展。 我担心步骤1和3将成为瓶颈,管道将无法很好地平衡,但值得尝试。

只是一个建议,但你有没有看过消费者/生产者模式? 一定数量的线程会读取磁盘上的文件并将内容提供给队列。 然后另一组线程(称为消费者)将“消耗”队列作为其填充。 http://zone.ni.com/devzone/cda/tut/p/id/3023

在这种情况下,你最好的选择绝对是生产者 – 消费者模型。 一个线程来拉取数据和一堆工人来处理它。 在I / O周围没有简单的方法,所以你可能只关注优化计算本身。

我现在将尝试绘制一个模型:

 // producer thread var refs = GetReferencesFromDB(); // ~5000 Datarow returned foreach(var ref in refs) { lock(queue) { queue.Enqueue(ref); event.Set(); } // if the queue is limited, test if the queue is full and wait. } // consumer threads while(true) { value = null; lock(queue) { if(queue.Count > 0) { value = queue.Dequeue(); } } if(value != null) // process value else event.WaitOne(); // event to signal that an item was placed in the queue. } 

您可以在C#中的线程的第4部分中找到有关生产者/消费者的更多详细信息: http : //www.albahari.com/threading/part4.aspx

我认为您分割文件列表并在一批中处理每个文件的方法是可以的。 我的感觉是,如果你玩并行度,你可能会获得更多的性能提升。 请参阅: var refs = GetReferencesFromDB().AsParallel().WithDegreeOfParallelism(16); 这将开始同时处理16个文件。 目前,您可能正在处理2或4个文件,具体取决于您拥有的核心数量。 只有在没有IO的情况下进行计算时,这才有效。 对于IO密集型任务,调整可能会带来惊人的性能改进,从而减少处

如果您要使用生产者 – 消费者分解并加入任务,请查看此示例: 使用Parallel Linq Extensions将两个序列合并,如何才能首先产生最快的结果?