如何尽可能高效地处理大量并发磁盘写请求

假设下面的方法被.net 4应用程序中的不同线程调用了几千次。 处理这种情况的最佳方法是什么? 了解磁盘是这里的瓶颈,但我希望WriteFile()方法快速返回。

数据可以高达几MB。 我们在谈论线程池,TPL等吗?

public void WriteFile(string FileName, MemoryStream Data) { try { using (FileStream DiskFile = File.OpenWrite(FileName)) { Data.WriteTo(DiskFile); DiskFile.Flush(); DiskFile.Close(); } } catch (Exception e) { Console.WriteLine(e.Message); } } 

既然你说文件不需要按顺序写入也不需要立即编写,最简单的方法是使用Task

 private void WriteFileSynchronous(string FileName, MemoryStream Data) { Task.Factory.StartNew(() => WriteFileSynchronously(FileName, Data)); } private void WriteFileSynchronous(string FileName, MemoryStream Data) { try { using (FileStream DiskFile = File.OpenWrite(FileName)) { Data.WriteTo(DiskFile); DiskFile.Flush(); DiskFile.Close(); } } catch (Exception e) { Console.WriteLine(e.Message); } } 

TPL在内部使用线程池,即使对于大量任务也应该相当有效。

如果你想快速返回而不是真正关心操作是同步的,你可以在内存Queue中创建某种类型的写入请求,并且当Queue没有填满时你可以快速从方法返回。 另一个线程将负责调度Queue和写入文件。 如果你的WriteFile被调用并且队列已满,你将不得不等到你可以排队并且执行将再次变为同步,但是这样你可以有一个大的缓冲区,所以如果进程文件写请求不是线性的,而是更尖刻的(在写文件调用尖峰之间暂停)这种变化可以看作是对性能的改进。

更新:为你做一个小图片。 请注意,瓶颈始终存在,您可以做的就是使用队列优化请求。 请注意,队列有限制,所以当它填满时,你不能将文件队列到,你必须等待,所以在那个缓冲区中也有一个空闲空间。 但是对于图片中呈现的情况(3个桶请求),显而易见的是,您可以快速将桶放入队列并返回,而在第一种情况下,您必须逐个执行此操作并阻止执行。

请注意,您永远不需要一次执行多个IO线程,因为它们都会使用相同的瓶颈,如果您尝试并行处理这个问题,您将浪费内存,我相信2到10个线程顶部将轻松获取所有可用的IO带宽,并将限制应用程序内存使用量。

在此处输入图像描述

如果数据的传输速度比您可以记录的速度快,那么您就会遇到真正的问题。 生产者/消费者设计, WriteFile只是将东西扔进ConcurrentQueue或类似的结构,并且为该队列服务的单独线程工作得很好……直到队列填满。 如果您正在讨论打开50,000个不同的文件,那么事情就会快速备份。 更不用说每个文件的数据可能是几兆字节,这将进一步限制队列的大小。

我有一个类似的问题,我通过将WriteFile方法附加到单个文件来解决。 它写的记录有记录号,文件名,长度,然后是数据。 汉斯在对原始问题的评论中指出,写入文件很快; 打开文件很慢。

程序中的第二个线程开始读取WriteFile写入的文件。 该线程读取每个记录头(数字,文件名,长度),打开一个新文件,然后将数据从日志文件复制到最终文件。

如果日志文件和最终文件位于不同的磁盘上,则效果会更好,但是对于单个主轴,它仍然可以正常工作。 但它肯定能锻炼你的硬盘。

它的缺点是需要2倍的磁盘空间,但是在150美元以下的2TB驱动器中,我不认为这是一个很大的问题。 它总体上比直接写入数据效率低(因为你必须处理两次数据),但它的好处是不会导致主处理线程停顿。

将完整的方法实现封装在新的Thread() 。 然后你可以“发射并忘记”这些线程并返回主调用线程。

  foreach (file in filesArray) { try { System.Threading.Thread updateThread = new System.Threading.Thread(delegate() { WriteFileSynchronous(fileName, data); }); updateThread.Start(); } catch (Exception ex) { string errMsg = ex.Message; Exception innerEx = ex.InnerException; while (innerEx != null) { errMsg += "\n" + innerEx.Message; innerEx = innerEx.InnerException; } errorMessages.Add(errMsg); } }