C#中的循环异步任务列表

我试图不断地从几个网站解析数据。 我希望以异步方式在循环中单独执行此操作,直到程序关闭。 我不确定这种逻辑应该是什么结构。

现在我正在遵循这种模式。

async public void ParseAll(List SiteList) { List TaskList = new List(); foreach(Site s in SiteList) { TaskList.Add(s.ParseData); } await Task.WhenAll(TaskList) } 

问题是,如果我围绕此方法构造一个循环,那么首先更新的站点必须等到整个列表完成才能再次运行该方法。 从理论上讲,我想做的就是在完成ParseData方法时将每个站点放回到TaskList的底部,但我不确定这是否可行,或者这是否是最好的方法。

从理论上讲,我想做的就是在完成ParseData每个站点放回TaskList的底部

看起来您需要维护要处理的站点队列。 下面是我对此的看法,使用SemaphoreSlim 。 这样,您还可以将并发任务的数量限制为小于实际的站点数,或者即时添加新站点。 CancellationToken用于从外部停止处理。 在这里使用async void是合理的,IMO, QueueSiteAsync会跟踪它启动的任务。

 using System; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; namespace AsyncLoop { class Program { public class Site { public string Url { get; set; } public async Task ParseDataAsync(CancellationToken token) { // simulate download and parse int delay = new Random(Environment.TickCount).Next(100, 1000); await Task.Delay(delay, token); Console.WriteLine("Processed: #{0}, delay: {1}", this.Url, delay); } } object _lock = new Object(); HashSet _pending = new HashSet(); // sites in progress SemaphoreSlim _semaphore; async void QueueSiteAsync(Site site, CancellationToken token) { Func processSiteAsync = async () => { await _semaphore.WaitAsync(token).ConfigureAwait(false); try { await site.ParseDataAsync(token); QueueSiteAsync(site, token); } finally { _semaphore.Release(); } }; var task = processSiteAsync(); lock (_lock) _pending.Add(task); try { await task; lock (_lock) _pending.Remove(task); } catch { if (!task.IsCanceled && !task.IsFaulted) throw; // non-task error, re-throw // leave the faulted task in the pending list and exit // ProcessAllSites will pick it up } } public async Task ProcessAllSites( Site[] sites, int maxParallel, CancellationToken token) { _semaphore = new SemaphoreSlim(Math.Min(sites.Length, maxParallel)); // start all sites foreach (var site in sites) QueueSiteAsync(site, token); // wait for cancellation try { await Task.Delay(Timeout.Infinite, token); } catch (OperationCanceledException) { } // wait for pending tasks Task[] tasks; lock (_lock) tasks = _pending.ToArray(); await Task.WhenAll(tasks); } // testing static void Main(string[] args) { // cancel processing in 10s var cts = new CancellationTokenSource(millisecondsDelay: 10000); var sites = Enumerable.Range(0, count: 10).Select(i => new Site { Url = i.ToString() }); try { new Program().ProcessAllSites( sites.ToArray(), maxParallel: 5, token: cts.Token).Wait(); } catch (AggregateException ex) { foreach (var innerEx in ex.InnerExceptions) Console.WriteLine(innerEx.Message); } } } } 

您可能还希望将下载和解析分成单独的管道,请查看此更多详细信息。

很容易创建一个循环连续循环并一遍又一遍地解析单个站点的方法。 拥有该方法后,您可以在列表中的每个站点上调用一次:

 private async void ParseSite(Site s) { while (true) { await s.ParseData(); } } public void ParseAll(List siteList) { foreach (var site in siteList) { ParseSite(site); } } 

如果你想在完成后再次访问该站点,你可能想要使用Task.WhenAny并将你的外部循环与你的内部循环集成,就像这样(假设ParseData函数将返回它正在解析的站点) :

 async public void ParseAll(List SiteList) { while (true) { List> TaskList = new List>(); foreach(Site s in SiteList) { TaskList.Add(s.ParseData()); } await Task.WhenAny(TaskList); TaskList = TaskList.Select(t => t.IsCompleted ? t.Result.ParseData() : t).ToList(); } } 

你试过PLinq lib吗?

Plinq允许您执行linq查询异步。

在你的情况下,它看起来像:

SiteList. AsParallel()ForEach (s => s.ParseData);