并行读取和处理文件C#

我有非常大的文件,我必须阅读和处理。 这可以使用线程并行完成吗?

这是我做过的一些代码。 但它似乎不会缩短执行时间,一个接一个地读取和处理文件。

String[] files = openFileDialog1.FileNames; Parallel.ForEach(files, f => { readTraceFile(f); }); private void readTraceFile(String file) { StreamReader reader = new StreamReader(file); String line; while ((line = reader.ReadLine()) != null) { String pattern = "\\s{4,}"; foreach (String trace in Regex.Split(line, pattern)) { if (trace != String.Empty) { String[] details = Regex.Split(trace, "\\s+"); Instruction instruction = new Instruction(details[0], int.Parse(details[1]), int.Parse(details[2])); Console.WriteLine("computing..."); instructions.Add(instruction); } } } } 

看起来您的应用程序的性能主要受IO限制。 但是,您的代码中仍然有一些CPU限制工作。 这两项工作是相互依赖的:在IO完成其工作之前,您的CPU绑定工作无法启动,并且在CPU完成上一个工作之前,IO不会继续执行下一个工作项。 他们互相抱着对方。 因此,如果您并行执行IO和CPU绑定工作, 可以 (在最底部解释)可以看到吞吐量的提高,如下所示:

 void ReadAndProcessFiles(string[] filePaths) { // Our thread-safe collection used for the handover. var lines = new BlockingCollection(); // Build the pipeline. var stage1 = Task.Run(() => { try { foreach (var filePath in filePaths) { using (var reader = new StreamReader(filePath)) { string line; while ((line = reader.ReadLine()) != null) { // Hand over to stage 2 and continue reading. lines.Add(line); } } } } finally { lines.CompleteAdding(); } }); var stage2 = Task.Run(() => { // Process lines on a ThreadPool thread // as soon as they become available. foreach (var line in lines.GetConsumingEnumerable()) { String pattern = "\\s{4,}"; foreach (String trace in Regex.Split(line, pattern)) { if (trace != String.Empty) { String[] details = Regex.Split(trace, "\\s+"); Instruction instruction = new Instruction(details[0], int.Parse(details[1]), int.Parse(details[2])); Console.WriteLine("computing..."); instructions.Add(instruction); } } } }); // Block until both tasks have completed. // This makes this method prone to deadlocking. // Consider using 'await Task.WhenAll' instead. Task.WaitAll(stage1, stage2); } 

我非常怀疑这是你的CPU工作,但如果恰好是这种情况,你也可以像这样并行化第2阶段:

  var stage2 = Task.Run(() => { var parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount }; Parallel.ForEach(lines.GetConsumingEnumerable(), parallelOptions, line => { String pattern = "\\s{4,}"; foreach (String trace in Regex.Split(line, pattern)) { if (trace != String.Empty) { String[] details = Regex.Split(trace, "\\s+"); Instruction instruction = new Instruction(details[0], int.Parse(details[1]), int.Parse(details[2])); Console.WriteLine("computing..."); instructions.Add(instruction); } } }); }); 

请注意,如果CPU工作组件与IO组件相比可以忽略不计,那么您将看不到太多的加速。 工作量越均匀,与顺序处理相比,管道的性能就越好。

由于我们正在讨论性能问题,因此我对上述代码中阻塞调用的数量并不特别激动。 如果我在自己的项目中这样做,我会去async / await路线。 在这种情况下,我选择不这样做,因为我想让事情易于理解并易于集成。

从你想要做的事情来看,你几乎可以肯定是I / O绑定的。 在这种情况下尝试并行处理无济于事,实际上可能会因磁盘驱动器上的附加查找操作而导致处理速度变慢(除非您可以将数据拆分为多个轴)。

请尝试并行处理这些行。 例如:

 var q = from file in files from line in File.ReadLines(file).AsParallel() // for smaller files File.ReadAllLines(file).AsParallel() might be faster from trace in line.Split(new [] {" "}, StringSplitOptions.RemoveEmptyEntries) // split by 4 spaces and no need for trace != "" check let details = trace.Split(null as char[], StringSplitOptions.RemoveEmptyEntries) // like Regex.Split(trace, "\\s+") but removes empty strings too select new Instruction(details[0], int.Parse(details[1]), int.Parse(details[2])); List instructions = q.ToList(); // all of the file reads and work is done here with .ToList 

随机访问非SSD硬盘驱动器(当您尝试同时读取/写入不同文件或碎片文件时)通常比顺序访问慢得多(例如读取单个碎片整理文件),所以我希望处理单个文件并行使用碎片整理文件更快。

此外,跨线程共享资源(例如Console.Write或添加到线程安全阻塞集合)可能会减慢或阻塞/死锁执行,因为某些线程必须等待其他线程完成访问该资源。