将IEnumerable 转换为IObservable ,具有最大并行度
我有一系列异步任务要做(比如,获取N个网页)。 现在我想要的是将它们全部暴露为IObservable
。 我目前的解决方案使用了这个问题的答案:
async Task GetPage(string page) { Console.WriteLine("Before"); var result = await FetchFromInternet(page); Console.WriteLine("After"); return result; } // pages is an IEnumerable IObservable resultObservable =pages.Select(GetPage). Select(t => Observable.FromAsync(() => t)).Merge(); // Now consume the list foreach(ResultObj obj in resultObservable.ToEnumerable()) { Console.WriteLine(obj.ToString()); }
问题是我不知道要获取的页面数量,而且可能很大。 我不想同时发出数百个请求。 所以我想要一种方法来限制并行执行的最大任务数。 有没有办法限制GetPage
的并发调用GetPage
?
有一个Merge
重载,它接受一个maxConcurrent参数,但它似乎并没有实际限制函数invokation的并发性。 控制台在After消息之前打印所有Before消息。
注意:我需要转换回IEnumerable
。 我正在编写一个系统的数据源,它给我提取数据的描述符,我需要给它一个下载数据的列表。
编辑
以下应该有效。 此重载限制了并发订阅的数量。
var resultObservable = pages .Select(p => Observable.FromAsync(() => GetPage(p))) .Merge(maxConcurrent);
说明
为了理解为什么需要这种改变,我们需要一些背景知识
-
FromAsync
返回一个observable, 每次订阅时都会调用传递的Func
。 这意味着如果observable从未订阅,则永远不会调用它。 -
Merge
急切地读取源序列,并且仅同时订阅n
可观察量。
通过这两个部分,我们可以知道为什么原始版本将并行执行所有内容:由于(2),在Merge
决定需要订阅多少个可观察GetPage
时,已经为所有源字符串调用了GetPage
。
我们还可以看到为什么第二个版本可以工作:即使序列已经完全迭代,(1)意味着在Merge
决定它需要订阅n
observable之前不会调用GetPage
。 这导致仅同时执行n
任务的期望结果。