C#Threading – 同时读取和散列多个文件,最简单的方法?

我一直试图得到我认为最简单的线程forms在我的应用程序中工作,但我不能这样做。

我想做什么:我有一个带有状态条的主表单和一个进度条。 我必须读取3到99个文件之间的内容并将它们的哈希值添加到字符串[]中,我想将其添加到所有文件的列表中,并使用它们各自的哈希值。 之后我必须将该列表中的项目与数据库(文本文件中的数据库)进行比较。 完成所有操作后,我必须将主窗体和进度条中的文本框更新为33%; 大多数情况下,我只是不希望主窗体在处理过程中冻结。

我正在使用的文件总是达到1.2GB(+/-几MB),这意味着我应该能够将它们读入byte []并从那里处理它们(我必须计算CRC32,MD5和SHA1)这些文件中的每一个应该比从HDD中读取所有文件的速度快3倍。

另外我应该注意,一些文件可能是1MB而另一个可能是1GB。 我最初想为99个文件创建99个线程,但这似乎不明智,我想最好重用小文件的线程,而更大的文件线程仍在运行。 但这对我来说听起来很复杂,所以我不确定这是否也是明智之举。

到目前为止,我已经尝试过workerThreads和backgroundWorkers,但似乎对我来说似乎都不太好; 至少backgroundWorkers工作了一些时间,但我甚至无法弄清楚为什么他们不会在其他时间…主要forms仍然冻结的方式。 现在我已经阅读了.NET 4.0中的任务并行库,但我认为在浪费更多时间之前我应该​​更好地问一个知道自己在做什么的人。

我想做的事情看起来像这样(没有线程):

List fileSpecifics = new List(); int fileMaxNumber = 42; // something between 3 and 99, depending on file set for (int i = 1; i <= fileMaxNumber; i++) { string fileName = "C:\\path\\to\\file" + i.ToString("D2") + ".ext"; // file01.ext - file99.ext string fileSize = new FileInfo(fileName).Length.ToString(); byte[] file = File.ReadAllBytes(fileName); // hash calculations (using SHA1CryptoServiceProvider() etc., no problems with that so I'll spare you that, return strings) file = null; // I didn't yet check if this made any actual difference but I figured it couldn't hurt fileSpecifics.Add(new string[] { fileName, fileSize, fileCRC, fileMD5, fileSHA1 }); } // look for files in text database mentioned above, ie first check for "file bundles" with the same amount of files I have here; then compare file sizes, then hashes // again, no problems with that so I'll spare you that; the database text files are pretty small so parsing them doesn't need to be done in an extra thread. 

有人能够指出我正确的方向吗? 我正在寻找最简单的方法来快速读取和散列这些文件(我相信散列需要花费一些时间才能读取其他文件)并将输出保存到字符串[],而不会使主窗体冻结,仅此而已,没什么。

我很感激任何意见。

编辑澄清:通过“backgroundWorkers工作的一些时间”我的意思是(对于同一组文件),也许我的代码的第一次和第四次执行产生正确的输出,UI在5秒内解冻,第二次,第三次和第五次执行它冻结了表单(并在60秒后我得到一条错误消息,说某个线程在该时间范围内没有响应)并且我必须通过VS停止执行。

感谢你的所有建议和指示,因为你们都已经正确地猜到我对线程是全新的,并且必须阅读你们发布的优秀链接。 然后我会试试这些方法并标出最能帮助我的答案。 再次感谢!

使用.NET Framework 4.X

  1. 使用Directory.EnumerateFiles方法进行高效/惰性文件枚举
  2. 使用Parallel.For()将并行工作委托给PLINQ框架或使用TPL委派每个管道阶段的单个任务
  3. 使用管道模式管理以下阶段:计算哈希码,与模式进行比较,更新UI
  4. 为了避免UI冻结使用适当的技术:对于WPF使用Dispatcher.BeginInvoke() ,对于WinForms使用Invoke(), 请参阅此SO答案
  5. 考虑到所有这些东西都有UI,如果需要添加一些取消function放弃长时间运行可能是有用的,看看CreateLinkedTokenSource类允许从“外部范围”触发CancellationToken我可以尝试添加一个例子,但它值得做它自己,所以你会学习所有这些东西,而不是简单地复制/粘贴 – >让它工作 – >忘了它。

PS:必须阅读 – MSDN上的Pipelines论文


TPL特定的管道实现

  • 管道模式实现:三个阶段:计算哈希,匹配,更新UI
  • 三个任务,每个阶段一个
  • 两个阻塞队列

//

 // 1) CalculateHashesImpl() should store all calculated hashes here // 2) CompareMatchesImpl() should read input hashes from this queue // Tuple.Item1 - hash, Typle.Item2 - file path var calculatedHashes = new BlockingCollection>(); // 1) CompareMatchesImpl() should store all pattern matching results here // 2) SyncUiImpl() method should read from this collection and update // UI with available results var comparedMatches = new BlockingCollection(); var factory = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None); var calculateHashesWorker = factory.StartNew(() => CalculateHashesImpl(...)); var comparedMatchesWorker = factory.StartNew(() => CompareMatchesImpl(...)); var syncUiWorker= factory.StartNew(() => SyncUiImpl(...)); Task.WaitAll(calculateHashesWorker, comparedMatchesWorker, syncUiWorker); 

CalculateHashesImpl():

 private void CalculateHashesImpl(string directoryPath) { foreach (var file in Directory.EnumerateFiles(directoryPath)) { var hash = CalculateHashTODO(file); calculatedHashes.Add(new Tuple(hash, file.Path)); } } 

CompareMatchesImpl():

 private void CompareMatchesImpl() { foreach (var hashEntry in calculatedHashes.GetConsumingEnumerable()) { // TODO: obviously return type is up to you string matchResult = GetMathResultTODO(hashEntry.Item1, hashEntry.Item2); comparedMatches.Add(matchResult); } } 

SyncUiImpl():

 private void UpdateUiImpl() { foreach (var matchResult in comparedMatches.GetConsumingEnumerable()) { // TODO: track progress in UI using UI framework specific features // to do not freeze it } } 

TODO:考虑使用CancellationToken作为所有GetConsumingEnumerable()调用的参数,这样您就可以在需要时轻松地停止管道执行。

首先,您应该使用更高级别的抽象来解决此问题。 您需要完成一系列任务,因此请使用“任务”抽象。 您应该使用任务并行库来执行此类操作。 让TPL处理要创建多少工作线程的问题 – 如果工作在I / O上进行门控,答案可能会低至1。

如果你想做自己的线程,一些好建议:

  • 不要阻止UI线程。 这就是冻结您的应用程序的原因。 提出一个协议,通过该协议 ,工作线程可以与您的UI线程进行通信,然后除了响应UI事件之外什么都不做。 请记住, 除了UI线程之外,任何其他线程都不能调用任务完成栏等用户界面控件的方法

  • 不要创建99个线程来读取99个文件。 这就像获得99件邮件并雇佣99名助手来回复:一个简单问题的极其昂贵的解决方案。 如果你的工作是CPU密集型的,那么“雇佣”线程没有任何意义,而不是你有CPU来维护它们。 (这就像在一个只有四张办公桌的办公室雇用99名助手。助理大部分时间都在等待桌子而不是阅读你的邮件。)如果你的工作是磁盘密集型的,那么这些大部分线程都会进行大部分时间等待磁盘闲置,这是一个更大的资源浪费。

首先,我希望您使用内置库来计算哈希值。 可以自己编写,但使用已存在一段时间的东西会更安全。

如果您的进程是CPU密集型的,则可能只需要创建与CPU一样多的线程。 如果它受I / O约束,您可能可以使用更multithreading。

我不建议将整个文件加载到内存中。 您的散列库应该支持一次更新一个块。 将一个块读入内存,用它来更新每个算法的哈希值,读取下一个块,然后重复直到文件末尾。 分块方法将有助于降低程序的内存需求。

正如其他人所建议的那样,查看任务并行库 ,特别是数据并行 。 它可能就像这样容易:

 Parallel.ForEach(fileSpecifics, item => CalculateHashes(item)); 

查看TPL数据流 。 您可以使用受限制的ActionBlock来管理您的硬件。

如果我理解您希望在后台执行某些任务而不阻止您的UI,那么UI BackgroundWorker将是一个合适的选择。 你提到你在某些时候让它工作,所以我的建议是把你在半工作状态下的东西拿走,并通过追踪失败来改进它。 如果我的预感是正确的,那么你的工作人员就会抛出一个exception,而你的代码中似乎没有这样做。 从其包含的线程中冒出的未处理的exception会导致不好的事情发生。

此代码使用两个任务来散列一个文件(流) – 一个用于读取,第二个用于散列,为了更健壮的方式,您应该向前读取更多块。

因为处理器的带宽远远高于磁盘,所以除非你使用一些高速闪存驱动器,否则你不会同时散列更多文件。

 public void TransformStream(Stream a_stream, long a_length = -1) { Debug.Assert((a_length == -1 || a_length > 0)); if (a_stream.CanSeek) { if (a_length > -1) { if (a_stream.Position + a_length > a_stream.Length) throw new IndexOutOfRangeException(); } if (a_stream.Position >= a_stream.Length) return; } System.Collections.Concurrent.ConcurrentQueue queue = new System.Collections.Concurrent.ConcurrentQueue(); System.Threading.AutoResetEvent data_ready = new System.Threading.AutoResetEvent(false); System.Threading.AutoResetEvent prepare_data = new System.Threading.AutoResetEvent(false); Task reader = Task.Factory.StartNew(() => { long total = 0; for (; ; ) { byte[] data = new byte[BUFFER_SIZE]; int readed = a_stream.Read(data, 0, data.Length); if ((a_length == -1) && (readed != BUFFER_SIZE)) data = data.SubArray(0, readed); else if ((a_length != -1) && (total + readed >= a_length)) data = data.SubArray(0, (int)(a_length - total)); total += data.Length; queue.Enqueue(data); data_ready.Set(); if (a_length == -1) { if (readed != BUFFER_SIZE) break; } else if (a_length == total) break; else if (readed != BUFFER_SIZE) throw new EndOfStreamException(); prepare_data.WaitOne(); } }); Task hasher = Task.Factory.StartNew((obj) => { IHash h = (IHash)obj; long total = 0; for (; ; ) { data_ready.WaitOne(); byte[] data; queue.TryDequeue(out data); prepare_data.Set(); total += data.Length; if ((a_length == -1) || (total < a_length)) { h.TransformBytes(data, 0, data.Length); } else { int readed = data.Length; readed = readed - (int)(total - a_length); h.TransformBytes(data, 0, data.Length); } if (a_length == -1) { if (data.Length != BUFFER_SIZE) break; } else if (a_length == total) break; else if (data.Length != BUFFER_SIZE) throw new EndOfStreamException(); } }, this); reader.Wait(); hasher.Wait(); } 

其余代码在这里: http : //hashlib.codeplex.com/SourceControl/changeset/view/71730#514336