并行任务库WaitAny Design

我刚开始探索PTL并有一个设计问题。

我的场景:我有一个URL列表,每个URL都引用一个图像。 我希望每个图像并行下载。 一旦下载了至少一个图像,我想执行一个对下载的图像执行某些操作的方法。 该方法不应该并行化 – 它应该是串行的。

我认为以下内容可行,但我不确定这是否是正确的方法。 因为我有收集图像的单独类以及对收集的图像做“某事”,所以我最终传递了一系列似乎错误的任务,因为它暴露了如何检索图像的内部工作方式。 但我不知道如何解决这个问题。 实际上,这两种方法都有更多,但这对此并不重要。 只要知道它们真的不应该归结为一个大的方法,它既可以检索也可以对图像做一些事情。

//From the Director class Task[] downloadTasks = collector.RetrieveImages(listOfURLs); for (int i = 0; i < listOfURLs.Count; i++) { //Wait for any of the remaining downloads to complete int completedIndex = Task.WaitAny(downloadTasks); Image completedImage = downloadTasks[completedIndex].Result; //Now do something with the image (this "something" must happen serially) //Uses the "Formatter" class to accomplish this let's say } /////////////////////////////////////////////////// //From the Collector class public Task[] RetrieveImages(List urls) { Task[] tasks = new Task[urls.Count]; int index = 0; foreach (string url in urls) { string lambdaVar = url; //Required... Bleh tasks[index] = Task.Factory.StartNew(() => { using (WebClient client = new WebClient()) { //TODO: Replace with live image locations string fileName = String.Format("{0}.png", i); client.DownloadFile(lambdaVar, Path.Combine(Application.StartupPath, fileName)); } return Image.FromFile(Path.Combine(Application.StartupPath, fileName)); }, TaskCreationOptions.LongRunning | TaskCreationOptions.AttachedToParent); index++; } return tasks; } 

通常,当您不关心任何其他任务的结果时,您使用WaitAny等待一项任务。 例如,如果您只是关心第一张返回的图像。

怎么样呢。

这将创建两个任务,一个用于加载图像并将其添加到阻塞集合中。 第二个任务等待集合并处理添加到队列的任何图像。 加载所有图像后,第一个任务将关闭队列,以便第二个任务可以关闭。

 using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Drawing; using System.IO; using System.Net; using System.Threading.Tasks; namespace ClassLibrary1 { public class Class1 { readonly string _path = Directory.GetCurrentDirectory(); public void Demo() { IList listOfUrls = new List(); listOfUrls.Add("http://sofzh.miximages.com/c%23/editicon.gif"); listOfUrls.Add("http://sofzh.miximages.com/c%23/favorite-star-on.gif"); listOfUrls.Add("http://sofzh.miximages.com/c%23/arrow_dsc_green.gif"); listOfUrls.Add("http://sofzh.miximages.com/c%23/editicon.gif"); listOfUrls.Add("http://sofzh.miximages.com/c%23/favorite-star-on.gif"); listOfUrls.Add("http://sofzh.miximages.com/c%23/arrow_dsc_green.gif"); listOfUrls.Add("http://sofzh.miximages.com/c%23/editicon.gif"); listOfUrls.Add("http://sofzh.miximages.com/c%23/favorite-star-on.gif"); listOfUrls.Add("http://sofzh.miximages.com/c%23/arrow_dsc_green.gif"); BlockingCollection images = new BlockingCollection(); Parallel.Invoke( () => // Task 1: load the images { Parallel.For(0, listOfUrls.Count, (i) => { Image img = RetrieveImages(listOfUrls[i], i); img.Tag = i; images.Add(img); // Add each image to the queue }); images.CompleteAdding(); // Done with images. }, () => // Task 2: Process images serially { foreach (var img in images.GetConsumingEnumerable()) { string newPath = Path.Combine(_path, String.Format("{0}_rot.png", img.Tag)); Console.WriteLine("Rotating image {0}", img.Tag); img.RotateFlip(RotateFlipType.RotateNoneFlipXY); img.Save(newPath); } }); } public Image RetrieveImages(string url, int i) { using (WebClient client = new WebClient()) { string fileName = Path.Combine(_path, String.Format("{0}.png", i)); Console.WriteLine("Downloading {0}...", url); client.DownloadFile(url, Path.Combine(_path, fileName)); Console.WriteLine("Saving {0} as {1}.", url, fileName); return Image.FromFile(Path.Combine(_path, fileName)); } } } } 

警告:代码没有任何错误检查或取消。 现在已经很晚了,你需要做些什么呢? 🙂

这是管道模式的一个例子。 它假设获取图像非常慢并且锁定集合内部锁定的成本不会导致问题,因为与下载图像所花费的时间相比,它发生的频率相对较低。

我们的书…您可以在http://parallelpatterns.codeplex.com/上阅读有关此和其他并行编程模式的更多信息。第7章介绍了管道,随附的示例显示了具有error handling和取消的管道。

当另一个任务完成时,TPL已经提供了ContinueWith函数来执行一个任务。 任务链是TPL中用于异步操作的主要模式之一。

以下方法下载一组图像,并通过重命名每个文件继续

 static void DownloadInParallel(string[] urls) { var tempFolder = Path.GetTempPath(); var downloads = from url in urls select Task.Factory.StartNew(() =>{ using (var client = new WebClient()) { var uri = new Uri(url); string file = Path.Combine(tempFolder,uri.Segments.Last()); client.DownloadFile(uri, file); return file; } },TaskCreationOptions.LongRunning|TaskCreationOptions.AttachedToParent) .ContinueWith(t=>{ var filePath = t.Result; File.Move(filePath, filePath + ".test"); },TaskContinuationOptions.ExecuteSynchronously); var results = downloads.ToArray(); Task.WaitAll(results); } 

您还应该从ParallelExtensionsExtras示例中检查WebClient异步任务 。 DownloadXXXTask扩展方法处理任务的创建和文件的异步下载。

以下方法使用DownloadDataTask扩展来获取图像的数据并在将其保存到磁盘之前将其旋转

 static void DownloadInParallel2(string[] urls) { var tempFolder = Path.GetTempPath(); var downloads = from url in urls let uri=new Uri(url) let filePath=Path.Combine(tempFolder,uri.Segments.Last()) select new WebClient().DownloadDataTask(uri) .ContinueWith(t=>{ var img = Image.FromStream(new MemoryStream(t.Result)); img.RotateFlip(RotateFlipType.RotateNoneFlipY); img.Save(filePath); },TaskContinuationOptions.ExecuteSynchronously); var results = downloads.ToArray(); Task.WaitAll(results); } 

执行此操作的最佳方法可能是实现Observer模式:让RetreiveImages函数实现IObservable ,将“完成的图像操作”放入IObserver对象的OnNext方法中,并将其订阅到RetreiveImages

我自己还没有尝试过这个(仍然需要更多地使用任务库)但我认为这是“正确”的方法。

//下载所有图像

 private async void GetAllImages () { var downloadTasks = listOfURLs.Where(url => !string.IsNullOrEmpty(url)).Select(async url => { var ret = await RetrieveImage(url); return ret; }).ToArray(); var counts = await Task.WhenAll(downloadTasks); } //From the Collector class public async Task RetrieveImage(string url) { var lambdaVar = url; //Required... Bleh using (WebClient client = new WebClient()) { //TODO: Replace with live image locations var fileName = String.Format("{0}.png", i); await client.DownloadFile(lambdaVar, Path.Combine(Application.StartupPath, fileName)); } return Image.FromFile(Path.Combine(Application.StartupPath, fileName)); }