使用Rx和SelectMany限制并发请求

我有一个我想要使用HttpClient同时下载的页面的URL列表。 URL列表可能很大(100或更多!)

我目前有这个代码:

 var urls = new List { @"http:\\www.amazon.com", @"http:\\www.bing.com", @"http:\\www.facebook.com", @"http:\\www.twitter.com", @"http:\\www.google.com" }; var client = new HttpClient(); var contents = urls .ToObservable() .SelectMany(uri => client.GetStringAsync(new Uri(uri, UriKind.Absolute))); contents.Subscribe(Console.WriteLine); 

问题是:由于SelectMany的使用,几乎同时创建了大量的任务。 似乎如果URL列表足够大,很多任务会给出超时(我得到“任务被取消”例外)。

因此,我认为应该有一种方法,可能使用某种调度程序,来限制并发任务的数量,在给定时间不允许超过5或6。

通过这种方式,我可以获得并发下载而无需启动太多可能会失速的任务,就像他们现在所做的那样。

如何做到这一点,我不会饱和大量的超时任务?

十分感谢。

记住SelectMany()实际上是Select().Merge() 。 虽然SelectMany没有maxConcurrent参数,但Merge() maxConcurrent 。 所以你可以使用它。

从您的示例中,您可以执行以下操作:

 var urls = new List { @"http:\\www.amazon.com", @"http:\\www.bing.com", @"http:\\www.facebook.com", @"http:\\www.twitter.com", @"http:\\www.google.com" }; var client = new HttpClient(); var contents = urls .ToObservable() .Select(uri => Observable.FromAsync(() => client.GetStringAsync(uri))) .Merge(2); // 2 maximum concurrent requests! contents.Subscribe(Console.WriteLine); 

以下是如何使用DataFlow API执行此操作的示例:

 private static Task DoIt() { var urls = new List { @"http:\\www.amazon.com", @"http:\\www.bing.com", @"http:\\www.facebook.com", @"http:\\www.twitter.com", @"http:\\www.google.com" }; var client = new HttpClient(); //Create a block that takes a URL as input //and produces the download result as output TransformBlock downloadBlock = new TransformBlock( uri => client.GetStringAsync(new Uri(uri, UriKind.Absolute)), new ExecutionDataflowBlockOptions { //At most 2 download operation execute at the same time MaxDegreeOfParallelism = 2 }); //Create a block that prints out the result ActionBlock doneBlock = new ActionBlock(x => Console.WriteLine(x)); //Link the output of the first block to the input of the second one downloadBlock.LinkTo( doneBlock, new DataflowLinkOptions { PropagateCompletion = true}); //input the urls into the first block foreach (var url in urls) { downloadBlock.Post(url); } downloadBlock.Complete(); //Mark completion of input //Allows consumer to wait for the whole operation to complete return doneBlock.Completion; } static void Main(string[] args) { DoIt().Wait(); Console.WriteLine("Done"); Console.ReadLine(); } 

你能看出这有用吗?

 var urls = new List { @"http:\\www.amazon.com", @"http:\\www.bing.com", @"http:\\www.google.com", @"http:\\www.twitter.com", @"http:\\www.google.com" }; var contents = urls .ToObservable() .SelectMany(uri => Observable .Using( () => new System.Net.Http.HttpClient(), client => client .GetStringAsync(new Uri(uri, UriKind.Absolute)) .ToObservable()));