用于处理c#中文件的multithreading任务

我一直在阅读很多关于线程的内容,但无法弄清楚如何找到我的问题的解决方案。 首先让我介绍一下这个问题。 我有需要处理的文件。 主机名和文件路径位于两个arrays中。

在此处输入图像描述
现在我想设置几个线程来处理文件。 要创建的线程数基于三个因素:
A)最大线程数不能超过所有方案中唯一主机名的数量。
B) 必须按顺序处理具有相同主机名的文件。 IE我们无法同时处理host1 _file1和host1 _file2。 (数据完整性将受到威胁,这超出了我的控制范围。
C)用户可以限制可用于处理的线程数。 线程数仍受上述条件A的限制。 这纯粹是因为如果我们有大量的主机让我们说50 ..我们可能不希望同时处理50个线程。

在上面的示例中,最多可以创建6个线程。

最佳处理程序如下所示。

最佳处理例程例程

public class file_prep_obj { public string[] file_paths; public string[] hostname; public Dictionary my_dictionary; public void get_files() { hostname = new string[]{ "host1", "host1", "host1", "host2", "host2", "host3", "host4","host4","host5","host6" }; file_paths=new string[]{"C:\\host1_file1","C:\\host1_file2","C:\\host1_file3","C:\\host2_file1","C:\\host2_file2","C:\\host2_file2", "C:\\host3_file1","C:\\host4_file1","C:\\host4_file2","C:\\host5_file1","C:\\host6_file1"}; //The dictionary provides a count on the number of files that need to be processed for a particular host. my_dictionary = hostname.GroupBy(x => x) .ToDictionary(g => g.Key, g => g.Count()); } } //This class contains a list of file_paths associated with the same host. //The group_file_host_name will be the same for a host. class host_file_thread { public string[] group_file_paths; public string[] group_file_host_name; public void process_file(string file_path_in) { var time_delay_random=new Random(); Console.WriteLine("Started processing File: " + file_path_in); Task.Delay(time_delay_random.Next(3000)+1000); Console.WriteLine("Completed processing File: " + file_path_in); } } class Program { static void Main(string[] args) { file_prep_obj my_files=new file_prep_obj(); my_files.get_files(); //Create our host objects... my_files.my_dictionary.Count represents the max number of threads host_file_thread[] host_thread=new host_file_thread[my_files.my_dictionary.Count]; int key_pair_count=0; int file_path_position=0; foreach (KeyValuePair pair in my_files.my_dictionary) { host_thread[key_pair_count] = new host_file_thread(); //Initialise the host_file_thread object. Because we have an array of a customised object host_thread[key_pair_count].group_file_paths=new string[pair.Value]; //Initialise the group_file_paths host_thread[key_pair_count].group_file_host_name=new string[pair.Value]; //Initialise the group_file_host_name for(int j=0;j<pair.Value;j++) { host_thread[key_pair_count].group_file_host_name[j]=pair.Key.ToString(); //Group the hosts host_thread[key_pair_count].group_file_paths[j]=my_files.file_paths[file_path_position]; //Group the file_paths file_path_position++; } key_pair_count++; }//Close foreach (KeyValuePair pair in my_files.my_dictionary) //TODO PROCESS FILES USING host_thread objects. }//Close static void Main(string[] args) }//Close Class Program 

我想我所追求的是如何编写符合上述规范的线程处理例程的指南。

您可以使用Stephen Toub的ForEachAsync扩展方法来处理文件。 它允许您指定要使用的并发线程数,并且它是非阻塞的,因此它可以释放主线程以进行其他处理。 这是文章中的方法:

 public static Task ForEachAsync(this IEnumerable source, int dop, Func body) { return Task.WhenAll( from partition in Partitioner.Create(source).GetPartitions(dop) select Task.Run(async delegate { using (partition) while (partition.MoveNext()) await body(partition.Current); })); } 

为了使用它,我稍微重构了你的代码。 我将字典更改为Dictionary> ,它基本上将主机作为键,然后将所有路径作为值。 我假设文件路径中包含主机名。

  my_dictionary = (from h in hostname from f in file_paths where f.Contains(h) select new { Hostname = h, File = f }).GroupBy(x => x.Hostname) .ToDictionary(x => x.Key, x => x.Select(s => s.File).Distinct().ToList()); 

我还将您的process_file方法更改为async就像在其中使用Task.Delay一样,您需要await它,否则它不会执行任何操作。

 public static async Task process_file(string file_path_in) { var time_delay_random = new Random(); Console.WriteLine("Started:{0} ThreadId:{1}", file_path_in, Thread.CurrentThread.ManagedThreadId); await Task.Delay(time_delay_random.Next(3000) + 1000); Console.WriteLine("Completed:{0} ThreadId:{1}", file_path_in, Thread.CurrentThread.ManagedThreadId); } 

要使用该代码,您将获得要使用的最大线程数,并将其传递给my_files.my_dictionary.ForEachAsync 。 您还提供了一个异步委托,它处理特定主机的每个文件,并依次等待每个文件进行处理。

 public static async Task MainAsync() { var my_files = new file_prep_obj(); my_files.get_files(); const int userSuppliedMaxThread = 5; var maxThreads = Math.Min(userSuppliedMaxThread, my_files.my_dictionary.Values.Count()); Console.WriteLine("MaxThreads = " + maxThreads); foreach (var pair in my_files.my_dictionary) { foreach (var path in pair.Value) { Console.WriteLine("Key= {0}, Value={1}", pair.Key, path); } } await my_files.my_dictionary.ForEachAsync(maxThreads, async (pair) => { foreach (var path in pair.Value) { // serially process each path for a particular host. await process_file(path); } }); } static void Main(string[] args) { MainAsync().Wait(); Console.ReadKey(); }//Close static void Main(string[] args) 

输出继电器

 MaxThreads = 5 Key= host1, Value=C:\host1_file1 Key= host1, Value=C:\host1_file2 Key= host1, Value=C:\host1_file3 Key= host2, Value=C:\host2_file1 Key= host2, Value=C:\host2_file2 Key= host3, Value=C:\host3_file1 Key= host4, Value=C:\host4_file1 Key= host4, Value=C:\host4_file2 Key= host5, Value=C:\host5_file1 Key= host6, Value=C:\host6_file1 Started:C:\host1_file1 ThreadId:10 Started:C:\host2_file1 ThreadId:12 Started:C:\host3_file1 ThreadId:13 Started:C:\host4_file1 ThreadId:11 Started:C:\host5_file1 ThreadId:10 Completed:C:\host1_file1 ThreadId:13 Completed:C:\host2_file1 ThreadId:12 Started:C:\host1_file2 ThreadId:13 Started:C:\host2_file2 ThreadId:12 Completed:C:\host2_file2 ThreadId:11 Completed:C:\host1_file2 ThreadId:13 Started:C:\host6_file1 ThreadId:11 Started:C:\host1_file3 ThreadId:13 Completed:C:\host5_file1 ThreadId:11 Completed:C:\host4_file1 ThreadId:12 Completed:C:\host3_file1 ThreadId:13 Started:C:\host4_file2 ThreadId:12 Completed:C:\host1_file3 ThreadId:11 Completed:C:\host6_file1 ThreadId:13 Completed:C:\host4_file2 ThreadId:12 

我正在解决你的问题并想出了以下方法。 它可能不是最好的,但我相信它符合您的需求。

在开始之前,我是扩展方法的忠实粉丝,所以这里有一个:

 public static class IEnumerableExtensions { public static void Each(this IEnumerable ie, Action action) { var i = 0; foreach (var e in ie) action(e, i++); } } 

这样做是循环一个集合(foreach),但保留项目和索引。 你会明白为什么以后需要这个。

然后我们有变量。

 public static string[] group_file_paths = { "host1", "host1", "host1", "host2", "host2", "host3", "host4", "host4", "host5", "host6" }; public static string[] group_file_host_name = { @"c:\\host1_file1", @"c:\\host1_file2", @"c:\\host1_file3", @"c:\\host2_file1", @"c:\\host2_file2", @"c:\\host3_file1", @"c:\\host4_file1", @"c:\\host4_file2", @"c:\\host5_file1", @"c:\\host5_file2", @"c:\\host6_file1" }; 

然后是主要代码:

 public static void Main(string[] args) { Dictionary> filesToProcess = new Dictionary>(); // Loop over the 2 arrays and creates a directory that contains the host as the key, and then all the filenames. group_file_paths.Each((host, hostIndex) => { if (filesToProcess.ContainsKey(host)) { filesToProcess[host].Add(group_file_host_name[hostIndex]); } else { filesToProcess.Add(host, new List()); filesToProcess[host].Add(group_file_host_name[hostIndex]); } }); var tasks = new List(); foreach (var kvp in filesToProcess) { tasks.Add(Task.Factory.StartNew(() => { foreach (var file in kvp.Value) { process_file(kvp.Key, file); } })); } var handleTaskCompletionTask = Task.WhenAll(tasks); handleTaskCompletionTask.Wait(); } 

这里可能需要一些解释:

所以我正在创建一个字典,其中包含主机作为键,值作为值,需要处理的文件列表。

你的字典看起来像:

  • 主机1
    • 档案1
    • 档案2
  • 主持人2
    • 档案1
  • 主持人3
    • 档案1
    • 档案2
    • 档案3

之后,我正在创建一组将使用TPL执行的任务。 我现在执行所有任务,我正在等待完成所有任务。

您的流程方法如下所示,仅用于测试目的:

  public static void process_file(string host, string file) { var time_delay_random = new Random(); Console.WriteLine("Host '{0}' - Started processing the file {1}.", host, file); Thread.Sleep(time_delay_random.Next(3000) + 1000); Console.WriteLine("Host '{0}' - Completed processing the file {1}.", host, file); Console.WriteLine(""); } 

这篇文章不包括自己设置线程的方法,但可以通过在任务上使用完成处理程序轻松实现。 比任何任务完成时,您可以再次循环收集并启动尚未完成的新任务。

所以,我希望它有所帮助。

我首先要更好地组织您的数据结构。 拥有两个独立的数组不仅会增加数据重复,还会产生隐式耦合,这对于查看代码的人来说可能并不明显。

保存有关单个任务的信息的类可能类似于:

 public class TaskInfo { private readonly string _hostName; public string HostName { get { return _hostName; } } private readonly ReadOnlyCollection _files; public ReadOnlyCollection Files { get { return _files; } } public TaskInfo(string host, IEnumerable files) { _hostName = host; _files = new ReadOnlyCollection(files.ToList()); } } 

现在,创建任务列表要简单得多:

 var list = new List() { new TaskInfo( host: "host1", files: new[] { @"c:\host1\file1.txt", @"c:\host1\file2.txt" }), new TaskInfo( host: "host2", files: new[] { @"c:\host2\file1.txt", @"c:\host2\file2.txt" }) /* ... */ }; 

现在您已准备好任务,您可以简单地使用System.Threading.Tasks命名空间中的各种类来并行调用它们。 如果您真的想限制并发任务的数量,可以使用MaxDegreeOfParallelism属性:

 Parallel.ForEach( list, new ParallelOptions() { MaxDegreeOfParallelism = 10 }, taskInfo => Process(taskInfo) ); 

如果你想创建自己的线程池,你也可以使用带有多个消费者线程的ConcurrentQueue来实现类似的事情,可能在WaitHandle列表上等待它们何时完成。

我认为ThreadPool是您的完美解决方案。 它将自己处理线程并排队工作。 此外,您可以设置最大线程限制,即使您有超过最大线程数,它仍将为您的工作排队。

 ThreadPool.SetMaxThreads([YourMaxThreads],[YourMaxThreads]); foreach (var t in host_thread) { ThreadPool.QueueUserWorkItem(Foo, t); } 

 private static void Foo(object thread) { foreach (var file in (thread as host_file_thread).group_file_paths) { (thread as host_file_thread).process_file(file); } } 

虽然我建议你改变你的数据结构并保持process_file方法