并行运行异步方法8次
如何将以下内容转换为Parallel.ForEach?
public async void getThreadContents(String[] threads) { HttpClient client = new HttpClient(); List usernames = new List(); int i = 0; foreach (String url in threads) { i++; progressLabel.Text = "Scanning thread " + i.ToString() + "/" + threads.Count(); HttpResponseMessage response = await client.GetAsync(url); String content = await response.Content.ReadAsStringAsync(); String user; Predicate userPredicate; foreach (Match match in regex.Matches(content)) { user = match.Groups[1].ToString(); userPredicate = (String x) => x == user; if (usernames.Find(userPredicate) != user) { usernames.Add(match.Groups[1].ToString()); } } progressBar1.PerformStep(); } }
我在假设异步和并行处理是相同的情况下对其进行编码,而我只是意识到它不是。 我看了一下我能找到的所有问题,我真的无法找到一个能为我做的例子。 其中大多数缺少可读的变量名称。 使用单字母变量名称不能解释它们包含的内容是一种说明示例的可怕方式。
我通常在名为threads的数组中包含300到2000个条目(包含论坛线程的URL),看起来并行处理(由于许多HTTP请求)会加快执行速度。
在使用Parallel.ForEach之前,是否必须删除所有异步(我在foreach之外没有任何异步,只有变量定义)? 我该怎么做呢? 我可以不阻塞主线程吗?
我顺便使用.NET 4.5。
我在假设异步和并行处理相同的情况下对其进行编码
异步处理和并行处理是完全不同的。 如果您不理解其中的差异,我认为您应该首先阅读更多相关内容(例如,c#中的异步和并行编程之间的关系是什么? )。
现在,您想要做的事情实际上并非如此简单,因为您希望以特定的并行度(8)异步处理大型集合。 使用同步处理,您可以使用Parallel.ForEach()
(以及ParallelOptions
来配置ParallelOptions
度),但没有简单的替代方法可以使用async
。
在您的代码中,由于您希望在UI线程上执行所有操作,因此这很复杂。 (尽管理想情况下,您不应该直接从计算中访问UI。相反,您应该使用IProgress
,这意味着代码不再需要在UI线程上执行。)
在.Net 4.5中执行此操作的最佳方法可能是使用TPL Dataflow。 它的ActionBlock
完全符合您的要求,但它可能非常冗长(因为它比您需要的更灵活)。 因此,创建一个辅助方法是有意义的:
public static Task AsyncParallelForEach( IEnumerable source, Func body, int maxDegreeOfParallelism = DataflowBlockOptions.Unbounded, TaskScheduler scheduler = null) { var options = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism }; if (scheduler != null) options.TaskScheduler = scheduler; var block = new ActionBlock (body, options); foreach (var item in source) block.Post(item); block.Complete(); return block.Completion; }
在你的情况下,你会像这样使用它:
await AsyncParallelForEach( threads, async url => await DownloadUrl(url), 8, TaskScheduler.FromCurrentSynchronizationContext());
这里, DownloadUrl()
是一个async Task
方法,用于处理单个URL(循环体), 8
是并行度(可能不应该是实际代码中的文字常量),而FromCurrentSynchronizationContext()
确保代码在UI线程上执行。
Stephen Toub有一篇关于实现ForEachAsync
博ForEachAsync
。 对于Dataflow可用的平台,Svick的答案非常好。
这是另一种选择,使用TPL中的分区程序:
public static Task ForEachAsync(this IEnumerable source, int degreeOfParallelism, Func body) { var partitions = Partitioner.Create(source).GetPartitions(degreeOfParallelism); var tasks = partitions.Select(async partition => { using (partition) while (partition.MoveNext()) await body(partition.Current); }); return Task.WhenAll(tasks); }
然后你可以这样使用:
public async Task getThreadContentsAsync(String[] threads) { HttpClient client = new HttpClient(); ConcurrentDictionary usernames = new ConcurrentDictionary(); await threads.ForEachAsync(8, async url => { HttpResponseMessage response = await client.GetAsync(url); String content = await response.Content.ReadAsStringAsync(); String user; foreach (Match match in regex.Matches(content)) { user = match.Groups[1].ToString(); usernames.TryAdd(user, null); } progressBar1.PerformStep(); }); }
另一个替代方案是使用SemaphoreSlim
或AsyncSemaphore
(它包含在我的AsyncEx库中,并支持比SemaphoreSlim
更多的平台):
public async Task getThreadContentsAsync(String[] threads) { SemaphoreSlim semaphore = new SemaphoreSlim(8); HttpClient client = new HttpClient(); ConcurrentDictionary usernames = new ConcurrentDictionary(); await Task.WhenAll(threads.Select(async url => { await semaphore.WaitAsync(); try { HttpResponseMessage response = await client.GetAsync(url); String content = await response.Content.ReadAsStringAsync(); String user; foreach (Match match in regex.Matches(content)) { user = match.Groups[1].ToString(); usernames.TryAdd(user, null); } progressBar1.PerformStep(); } finally { semaphore.Release(); } })); }
您可以尝试AsyncEnumerator NuGet包中的ParallelForEachAsync
扩展方法:
using System.Collections.Async; public async void getThreadContents(String[] threads) { HttpClient client = new HttpClient(); List usernames = new List (); int i = 0; await threads.ParallelForEachAsync(async url => { i++; progressLabel.Text = "Scanning thread " + i.ToString() + "/" + threads.Count (); HttpResponseMessage response = await client.GetAsync(url); String content = await response.Content.ReadAsStringAsync(); String user; Predicate userPredicate; foreach (Match match in regex.Matches(content)) { user = match.Groups[1].ToString(); userPredicate = (String x) => x == user; if (usernames.Find(userPredicate) != user) { usernames.Add(match.Groups[1].ToString()); } } // THIS CALL MUST BE THREAD-SAFE! progressBar1.PerformStep(); }, maxDegreeOfParallelism: 8); }