用于处理c#中文件的multithreading任务
我一直在阅读很多关于线程的内容,但无法弄清楚如何找到我的问题的解决方案。 首先让我介绍一下这个问题。 我有需要处理的文件。 主机名和文件路径位于两个arrays中。
现在我想设置几个线程来处理文件。 要创建的线程数基于三个因素:
A)最大线程数不能超过所有方案中唯一主机名的数量。
B) 必须按顺序处理具有相同主机名的文件。 IE我们无法同时处理host1 _file1和host1 _file2。 (数据完整性将受到威胁,这超出了我的控制范围。
C)用户可以限制可用于处理的线程数。 线程数仍受上述条件A的限制。 这纯粹是因为如果我们有大量的主机让我们说50 ..我们可能不希望同时处理50个线程。
在上面的示例中,最多可以创建6个线程。
最佳处理程序如下所示。
public class file_prep_obj { public string[] file_paths; public string[] hostname; public Dictionary my_dictionary; public void get_files() { hostname = new string[]{ "host1", "host1", "host1", "host2", "host2", "host3", "host4","host4","host5","host6" }; file_paths=new string[]{"C:\\host1_file1","C:\\host1_file2","C:\\host1_file3","C:\\host2_file1","C:\\host2_file2","C:\\host2_file2", "C:\\host3_file1","C:\\host4_file1","C:\\host4_file2","C:\\host5_file1","C:\\host6_file1"}; //The dictionary provides a count on the number of files that need to be processed for a particular host. my_dictionary = hostname.GroupBy(x => x) .ToDictionary(g => g.Key, g => g.Count()); } } //This class contains a list of file_paths associated with the same host. //The group_file_host_name will be the same for a host. class host_file_thread { public string[] group_file_paths; public string[] group_file_host_name; public void process_file(string file_path_in) { var time_delay_random=new Random(); Console.WriteLine("Started processing File: " + file_path_in); Task.Delay(time_delay_random.Next(3000)+1000); Console.WriteLine("Completed processing File: " + file_path_in); } } class Program { static void Main(string[] args) { file_prep_obj my_files=new file_prep_obj(); my_files.get_files(); //Create our host objects... my_files.my_dictionary.Count represents the max number of threads host_file_thread[] host_thread=new host_file_thread[my_files.my_dictionary.Count]; int key_pair_count=0; int file_path_position=0; foreach (KeyValuePair pair in my_files.my_dictionary) { host_thread[key_pair_count] = new host_file_thread(); //Initialise the host_file_thread object. Because we have an array of a customised object host_thread[key_pair_count].group_file_paths=new string[pair.Value]; //Initialise the group_file_paths host_thread[key_pair_count].group_file_host_name=new string[pair.Value]; //Initialise the group_file_host_name for(int j=0;j<pair.Value;j++) { host_thread[key_pair_count].group_file_host_name[j]=pair.Key.ToString(); //Group the hosts host_thread[key_pair_count].group_file_paths[j]=my_files.file_paths[file_path_position]; //Group the file_paths file_path_position++; } key_pair_count++; }//Close foreach (KeyValuePair pair in my_files.my_dictionary) //TODO PROCESS FILES USING host_thread objects. }//Close static void Main(string[] args) }//Close Class Program
我想我所追求的是如何编写符合上述规范的线程处理例程的指南。
您可以使用Stephen Toub的ForEachAsync扩展方法来处理文件。 它允许您指定要使用的并发线程数,并且它是非阻塞的,因此它可以释放主线程以进行其他处理。 这是文章中的方法:
public static Task ForEachAsync(this IEnumerable source, int dop, Func body) { return Task.WhenAll( from partition in Partitioner.Create(source).GetPartitions(dop) select Task.Run(async delegate { using (partition) while (partition.MoveNext()) await body(partition.Current); })); }
为了使用它,我稍微重构了你的代码。 我将字典更改为Dictionary
,它基本上将主机作为键,然后将所有路径作为值。 我假设文件路径中包含主机名。
my_dictionary = (from h in hostname from f in file_paths where f.Contains(h) select new { Hostname = h, File = f }).GroupBy(x => x.Hostname) .ToDictionary(x => x.Key, x => x.Select(s => s.File).Distinct().ToList());
我还将您的process_file
方法更改为async
就像在其中使用Task.Delay
一样,您需要await
它,否则它不会执行任何操作。
public static async Task process_file(string file_path_in) { var time_delay_random = new Random(); Console.WriteLine("Started:{0} ThreadId:{1}", file_path_in, Thread.CurrentThread.ManagedThreadId); await Task.Delay(time_delay_random.Next(3000) + 1000); Console.WriteLine("Completed:{0} ThreadId:{1}", file_path_in, Thread.CurrentThread.ManagedThreadId); }
要使用该代码,您将获得要使用的最大线程数,并将其传递给my_files.my_dictionary.ForEachAsync
。 您还提供了一个异步委托,它处理特定主机的每个文件,并依次等待每个文件进行处理。
public static async Task MainAsync() { var my_files = new file_prep_obj(); my_files.get_files(); const int userSuppliedMaxThread = 5; var maxThreads = Math.Min(userSuppliedMaxThread, my_files.my_dictionary.Values.Count()); Console.WriteLine("MaxThreads = " + maxThreads); foreach (var pair in my_files.my_dictionary) { foreach (var path in pair.Value) { Console.WriteLine("Key= {0}, Value={1}", pair.Key, path); } } await my_files.my_dictionary.ForEachAsync(maxThreads, async (pair) => { foreach (var path in pair.Value) { // serially process each path for a particular host. await process_file(path); } }); } static void Main(string[] args) { MainAsync().Wait(); Console.ReadKey(); }//Close static void Main(string[] args)
输出继电器
MaxThreads = 5 Key= host1, Value=C:\host1_file1 Key= host1, Value=C:\host1_file2 Key= host1, Value=C:\host1_file3 Key= host2, Value=C:\host2_file1 Key= host2, Value=C:\host2_file2 Key= host3, Value=C:\host3_file1 Key= host4, Value=C:\host4_file1 Key= host4, Value=C:\host4_file2 Key= host5, Value=C:\host5_file1 Key= host6, Value=C:\host6_file1 Started:C:\host1_file1 ThreadId:10 Started:C:\host2_file1 ThreadId:12 Started:C:\host3_file1 ThreadId:13 Started:C:\host4_file1 ThreadId:11 Started:C:\host5_file1 ThreadId:10 Completed:C:\host1_file1 ThreadId:13 Completed:C:\host2_file1 ThreadId:12 Started:C:\host1_file2 ThreadId:13 Started:C:\host2_file2 ThreadId:12 Completed:C:\host2_file2 ThreadId:11 Completed:C:\host1_file2 ThreadId:13 Started:C:\host6_file1 ThreadId:11 Started:C:\host1_file3 ThreadId:13 Completed:C:\host5_file1 ThreadId:11 Completed:C:\host4_file1 ThreadId:12 Completed:C:\host3_file1 ThreadId:13 Started:C:\host4_file2 ThreadId:12 Completed:C:\host1_file3 ThreadId:11 Completed:C:\host6_file1 ThreadId:13 Completed:C:\host4_file2 ThreadId:12
我正在解决你的问题并想出了以下方法。 它可能不是最好的,但我相信它符合您的需求。
在开始之前,我是扩展方法的忠实粉丝,所以这里有一个:
public static class IEnumerableExtensions { public static void Each(this IEnumerable ie, Action action) { var i = 0; foreach (var e in ie) action(e, i++); } }
这样做是循环一个集合(foreach),但保留项目和索引。 你会明白为什么以后需要这个。
然后我们有变量。
public static string[] group_file_paths = { "host1", "host1", "host1", "host2", "host2", "host3", "host4", "host4", "host5", "host6" }; public static string[] group_file_host_name = { @"c:\\host1_file1", @"c:\\host1_file2", @"c:\\host1_file3", @"c:\\host2_file1", @"c:\\host2_file2", @"c:\\host3_file1", @"c:\\host4_file1", @"c:\\host4_file2", @"c:\\host5_file1", @"c:\\host5_file2", @"c:\\host6_file1" };
然后是主要代码:
public static void Main(string[] args) { Dictionary> filesToProcess = new Dictionary>(); // Loop over the 2 arrays and creates a directory that contains the host as the key, and then all the filenames. group_file_paths.Each((host, hostIndex) => { if (filesToProcess.ContainsKey(host)) { filesToProcess[host].Add(group_file_host_name[hostIndex]); } else { filesToProcess.Add(host, new List ()); filesToProcess[host].Add(group_file_host_name[hostIndex]); } }); var tasks = new List(); foreach (var kvp in filesToProcess) { tasks.Add(Task.Factory.StartNew(() => { foreach (var file in kvp.Value) { process_file(kvp.Key, file); } })); } var handleTaskCompletionTask = Task.WhenAll(tasks); handleTaskCompletionTask.Wait(); }
这里可能需要一些解释:
所以我正在创建一个字典,其中包含主机作为键,值作为值,需要处理的文件列表。
你的字典看起来像:
- 主机1
- 档案1
- 档案2
- 主持人2
- 档案1
- 主持人3
- 档案1
- 档案2
- 档案3
之后,我正在创建一组将使用TPL执行的任务。 我现在执行所有任务,我正在等待完成所有任务。
您的流程方法如下所示,仅用于测试目的:
public static void process_file(string host, string file) { var time_delay_random = new Random(); Console.WriteLine("Host '{0}' - Started processing the file {1}.", host, file); Thread.Sleep(time_delay_random.Next(3000) + 1000); Console.WriteLine("Host '{0}' - Completed processing the file {1}.", host, file); Console.WriteLine(""); }
这篇文章不包括自己设置线程的方法,但可以通过在任务上使用完成处理程序轻松实现。 比任何任务完成时,您可以再次循环收集并启动尚未完成的新任务。
所以,我希望它有所帮助。
我首先要更好地组织您的数据结构。 拥有两个独立的数组不仅会增加数据重复,还会产生隐式耦合,这对于查看代码的人来说可能并不明显。
保存有关单个任务的信息的类可能类似于:
public class TaskInfo { private readonly string _hostName; public string HostName { get { return _hostName; } } private readonly ReadOnlyCollection _files; public ReadOnlyCollection Files { get { return _files; } } public TaskInfo(string host, IEnumerable files) { _hostName = host; _files = new ReadOnlyCollection (files.ToList()); } }
现在,创建任务列表要简单得多:
var list = new List() { new TaskInfo( host: "host1", files: new[] { @"c:\host1\file1.txt", @"c:\host1\file2.txt" }), new TaskInfo( host: "host2", files: new[] { @"c:\host2\file1.txt", @"c:\host2\file2.txt" }) /* ... */ };
现在您已准备好任务,您可以简单地使用System.Threading.Tasks
命名空间中的各种类来并行调用它们。 如果您真的想限制并发任务的数量,可以使用MaxDegreeOfParallelism属性:
Parallel.ForEach( list, new ParallelOptions() { MaxDegreeOfParallelism = 10 }, taskInfo => Process(taskInfo) );
如果你想创建自己的线程池,你也可以使用带有多个消费者线程的ConcurrentQueue
来实现类似的事情,可能在WaitHandle
列表上等待它们何时完成。
我认为ThreadPool
是您的完美解决方案。 它将自己处理线程并排队工作。 此外,您可以设置最大线程限制,即使您有超过最大线程数,它仍将为您的工作排队。
ThreadPool.SetMaxThreads([YourMaxThreads],[YourMaxThreads]); foreach (var t in host_thread) { ThreadPool.QueueUserWorkItem(Foo, t); }
private static void Foo(object thread) { foreach (var file in (thread as host_file_thread).group_file_paths) { (thread as host_file_thread).process_file(file); } }
虽然我建议你改变你的数据结构并保持process_file
方法