使用.NET进行multithreading文件处理

有一个包含1000个小文本文件的文件夹。 我的目标是解析和处理所有这些文件,同时将更多文件填充到文件夹中。 我的目的是multithreading这个操作,因为单线程原型花了六分钟来处理1000个文件。

我喜欢读写器线程如下。 当读者线程正在读取文件时,我想让编写器线程来处理它们。 一旦阅读器开始阅读文件,我想将其标记为正在处理,例如通过重命名。 读完后,将其重命名为已完成。

我如何处理这样的multithreading应用程序?

使用分布式哈希表或队列更好吗?

我使用哪种数据结构可以避免锁定?

这个方案有更好的方法吗?

由于对评论中.NET 4的工作方式有好奇心,所以这就是这种方法。 对不起,OP可能不是一个选项。 免责声明:这不是一个高度科学的分析,只是表明有明显的性能优势。 基于硬件,您的里程可能差异很大。

这是一个快速测试(如果你看到这个简单测试中的一个大错误,它只是一个例子。请评论,我们可以修复它更有用/准确)。 为此,我将12,000~60 KB的文件作为样本放入目录中(启动LINQPad ;你可以自己玩它,免费! – 确保获得LINQPad 4 ):

var files = Directory.GetFiles("C:\\temp", "*.*", SearchOption.AllDirectories).ToList(); var sw = Stopwatch.StartNew(); //start timer files.ForEach(f => File.ReadAllBytes(f).GetHashCode()); //do work - serial sw.Stop(); //stop sw.ElapsedMilliseconds.Dump("Run MS - Serial"); //display the duration sw.Restart(); files.AsParallel().ForAll(f => File.ReadAllBytes(f).GetHashCode()); //parallel sw.Stop(); sw.ElapsedMilliseconds.Dump("Run MS - Parallel"); 

大多数 简单情况下 ,只需略微更改循环以并行化查询即可。 通过“简单”,我主要意味着一个动作的结果不会影响下一个动作。 最常记住的是一些集合,例如我们方便的List 不是线程安全的 ,所以在并行场景中使用它并不是一个好主意:)幸运的是, 在.NET 4中添加了并发集合这是线程安全的。 另外请记住,如果您使用锁定集合,这可能也是一个瓶颈,具体取决于具体情况。

这使用.NET 4.0中提供的.AsParallel(IEnumeable).ForAll(ParallelQuery)扩展。 .AsParallel()调用在实现ParallelEnumerableWrapperParallelEnumerableWrapper (内部类)中包装IEnumerable ParallelQuery 。 现在允许您使用并行扩展方法 ,在这种情况下我们使用.ForAll()

.ForAll()内部创建一个ForAllOperator(query, action)并同步运行它。 这会在线程运行之后处理线程的线程和合并……在那里有相当多的进展, 如果你想了解更多,我建议从这里开始,包括其他选项 。


结果(计算机1 – 物理硬盘):

  • 型号1288 – 1333ms
  • 平行: 461 – 503ms

电脑规格 – 用于比较:

  • 四核i7 920 @ 2.66 GHz
  • 12 GB RAM(DDR 1333)
  • 300 GB 10k rpm WD VelociRaptor

结果(计算机2 – 固态硬盘):

  • 型号: 545 – 601 ms
  • 平行: 248 – 278毫秒

电脑规格 – 用于比较:

  • 四核2 Quad Q9100 @ 2.26 GHz
  • 8 GB RAM(DDR 1333)
  • 120 GB OCZ Vertex SSD (标准版 – 1.4固件)

这次我没有CPU / RAM的链接,这些已经安装好了。 这是戴尔M6400笔记本电脑( 这里是M6500的链接 ……戴尔自己的6400链接已损坏 )。


这些数字来自10次运行,取内部8个结果的最小值/最大值(删除每个可能的exception值的原始最小值/最大值)。 我们在这里遇到了I / O瓶颈,特别是在物理驱动器上,但想想串行方法的作用。 它读取,处理,读取,处理,冲洗重复。 使用并行方法,您(即使有I / O瓶颈) 同时读取和处理。 在最严重的瓶颈情况下,您正在处理一个文件,同时阅读下一个文件。 仅此一项(在任何当前的计算机上!)应该会带来一些性能提升。 你可以看到我们在上面的结果中一次可以得到一个以上,给我们一个健康的提升。

另一个免责声明:四核+ .NET 4并行不会给你四倍的性能,它不会线性扩展…还有其他考虑因素和瓶颈在起作用。

我希望这有兴趣展示方法和可能的好处。 随意批评或改进…这个答案仅存在于评论中指出的好奇者:)

设计

生产者/消费者模式可能对这种情况最有用。 您应该创建足够的线程以最大化吞吐量。

以下是关于制作人/消费者模式的一些问题,以便您了解其工作原理:

  • C#生产者/消费者模式
  • C#生产者/消费者

您应该使用阻塞队列,并且生产者应该在消费者处理队列中的文件时将文件添加到队列中。 阻塞队列不需要锁定,因此它是解决问题的最有效方法。

如果您使用的是.NET 4.0,则可以使用多个并发集合 :

  • ConcurrentQueue: http : //msdn.microsoft.com/en-us/library/dd267265%28v=VS.100%29.aspx
  • BlockingCollection: http : //msdn.microsoft.com/en-us/library/dd267312%28VS.100%29.aspx

穿线

单个生产者线程可能是从磁盘加载文件并将其推入队列的最有效方式; 随后,多个消费者将从队列中弹出项目,他们将处理它们。 我建议您每个核心尝试2-4个消费者线程,并进行一些性能测量,以确定哪个是最优的(即为您提供最大吞吐量的线程数)。 我建议在这个特定的例子中使用ThreadPool。

PS我不明白单点故障和分布式哈希表的使用是什么? 我知道DHT听起来真的很酷,但我会首先尝试传统方法,除非你有一个特定的问题,你想要解决。

我建议您为每个文件排队一个线程,并跟踪字典中正在运行的线程,在线程完成时启动一个新线程,直到达到最大限制。 我喜欢在长时间运行时创建自己的线程,并在完成或遇到exception时使用回调来发出信号。 在下面的示例中,我使用字典来跟踪正在运行的worker实例。 这样,如果我想提前停止工作,我可以调用一个实例。 回调还可用于更新具有进度和吞吐量的UI。 您还可以为添加的点动态调整运行的线程限制。

示例代码是缩写演示器,但它确实运行。

 class Program { static void Main(string[] args) { Supervisor super = new Supervisor(); super.LaunchWaitingThreads(); while (!super.Done) { Thread.Sleep(200); } Console.WriteLine("\nDone"); Console.ReadKey(); } } public delegate void StartCallbackDelegate(int idArg, Worker workerArg); public delegate void DoneCallbackDelegate(int idArg); public class Supervisor { Queue waitingThreads = new Queue(); Dictionary runningThreads = new Dictionary(); int maxThreads = 20; object locker = new object(); public bool Done { get { lock (locker) { return ((waitingThreads.Count == 0) && (runningThreads.Count == 0)); } } } public Supervisor() { // queue up a thread for each file Directory.GetFiles("C:\\folder").ToList().ForEach(n => waitingThreads.Enqueue(CreateThread(n))); } Thread CreateThread(string fileNameArg) { Thread thread = new Thread(new Worker(fileNameArg, WorkerStart, WorkerDone).ProcessFile); thread.IsBackground = true; return thread; } // called when a worker starts public void WorkerStart(int threadIdArg, Worker workerArg) { lock (locker) { // update with worker instance runningThreads[threadIdArg] = workerArg; } } // called when a worker finishes public void WorkerDone(int threadIdArg) { lock (locker) { runningThreads.Remove(threadIdArg); } Console.WriteLine(string.Format(" Thread {0} done", threadIdArg.ToString())); LaunchWaitingThreads(); } // launches workers until max is reached public void LaunchWaitingThreads() { lock (locker) { while ((runningThreads.Count < maxThreads) && (waitingThreads.Count > 0)) { Thread thread = waitingThreads.Dequeue(); runningThreads.Add(thread.ManagedThreadId, null); // place holder so count is accurate thread.Start(); } } } } public class Worker { string fileName; StartCallbackDelegate startCallback; DoneCallbackDelegate doneCallback; public Worker(string fileNameArg, StartCallbackDelegate startCallbackArg, DoneCallbackDelegate doneCallbackArg) { fileName = fileNameArg; startCallback = startCallbackArg; doneCallback = doneCallbackArg; } public void ProcessFile() { startCallback(Thread.CurrentThread.ManagedThreadId, this); Console.WriteLine(string.Format("Reading file {0} on thread {1}", fileName, Thread.CurrentThread.ManagedThreadId.ToString())); File.ReadAllBytes(fileName); doneCallback(Thread.CurrentThread.ManagedThreadId); } } 

一般来说,1000个小文件(有多小,顺便说一句?)不应该花费6分钟来处理。 作为一个快速测试,在包含文件的目录中find "foobar" * (引号中的第一个参数无关紧要;它可以是任何内容),并查看处理每个文件所需的时间。 如果需要超过一秒钟,我会感到失望。

假设这个测试证实了我的怀疑,那么这个过程是CPU限制的,你将把读数分成它自己的线程没有任何改进。 你应该:

  1. 弄清楚为什么平均需要超过350毫秒来处理一个小输入,并希望改进算法。
  2. 如果没有办法加速算法并且你有一台多核机器(现在几乎每个人都这样做),使用线程池为每个读取一个文件的工作分配1000个任务。

您可以拥有一个中央队列,读取器线程在将内存内容推送到队列期间需要写访问权限。 处理线程需要对该中央队列的读访问权以弹出要处理的下一个存储器流。 这样,您可以最大限度地减少锁中所花费的时间,而无需处理无锁代码的复杂性。

编辑:理想情况下,您将优雅地处理所有exception/错误条件(如果有),因此您没有失败点。

作为替代方案,您可以拥有多个线程,每个线程在处理之前通过重命名来“声明”文件,因此文件系统成为锁定访问的实现。 不知道这是否比我原来的答案更高效,只有测试才能说明。

您可以考虑要处理的文件队列。 通过在启动时扫描目录来填充队列,并使用FileSystemWatcher更新队列,以便有效地将新文件添加到队列中,而无需不断地重新扫描目录。

如果可能的话,读写不同的物理磁盘。 这将为您提供最大的IO性能。

如果您要处理许多文件的初始突发,然后添加的文件速度不均匀且这一切都发生在同一磁盘上(读/写),您可以考虑将处理过的文件缓冲到内存,直到其中一个条件适用:

  • 有(暂时)没有新文件
  • 你已经缓冲了很多文件,你不想使用更多的内存来缓冲(理想情况下是一个可配置的阈值)

如果您对文件的实际处理是CPU密集型的,则可以考虑为每个CPU核心配置一个处理线程。 但是,对于“正常”处理,与IO时间相比,CPU时间将是微不足道的,并且复杂性不值得任何微小的收益。