如何在执行Task.WhenAny时产生返回项

我的解决方案中有两个项目:WPF项目和类库。

在我的class级图书馆:

我有一个符号列表:

class Symbol { Identifier Identifier {get;set;} List HistoricalQuotes {get;set;} List HistoricalFinancials {get;set;} } 

对于每个符号,我查询金融服务,以使用webrequest检索每个符号的历史财务数据。 (webClient.DownloadStringTaskAsync(URI))

所以这是我的方法:

  public async Task<IEnumerable> GetSymbolsAsync() { var historicalFinancialTask = new List<Task>(); foreach (var symbol in await _listSymbols) { historicalFinancialTask.Add(GetFinancialsQueryAsync(symbol)); } while (historicalFinancialTask.Count > 0) { var historicalFinancial = await Task.WhenAny(historicalFinancialTask); historicalFinancialTask.Remove(historicalFinancial); // the line below doesn't compile, which is understandable because method's return type is a Task of something yield return new Symbol(historicalFinancial.Result.Symbol.Identifier, historicalFinancial.Result.Symbol.HistoricalQuotes, historicalFinancial.Result.Data); } } private async Task GetFinancialsQueryAsync(Symbol symbol) { var result = new HistoricalFinancialResult(); result.Symbol = symbol; result.Data = await _financialsQuery.GetFinancialsQuery(symbol.Identifier); // contains some logic like parsing and use WebClient to query asynchronously return result; } private class HistoricalFinancialResult { public Symbol Symbol { get; set; } public IEnumerable Data { get; set; } // equality members } 

正如您所看到的,我希望每次下载每个符号的财务历史数据时,都会产生结果,而不是等待我对金融服务的所有调用完成。

在我的WPF中,这就是我想做的事情:

 foreach(var symbol in await _service.GetSymbolsAsync()) { SymbolsObservableCollection.Add(symbol); } 

看来我们不能在异步方法中产生回报,那么我可以使用什么解决方案? 除了将我的GetSymbols方法移动到我的WPF项目中。

虽然我喜欢TPL Dataflow组件(svick建议您使用),但转移到该系统确实需要大量的承诺 – 这不是您可以添加到现有设计中的东西。 如果您正在执行大量CPU密集型数据处理并希望利用许多CPU内核,它可以提供相当大的好处。 但是,充分利用它是非常重要的。

他使用Rx的另一个建议可能更容易与现有解决方案集成。 (请参阅原始文档 ,但是对于最新的代码,请使用Rx-Main nuget包。或者,如果您想查看源代码,请参阅Rx CodePlex站点 )甚至可以继续调用代码如果你愿意,可以使用IEnumerable – 你可以将Rx纯粹用作实现细节,[ 编辑2013/11/09添加: ]虽然正如svick指出的那样,鉴于你的最终目标,这可能不是一个好主意。

在我向您展示一个例子之前,我想清楚一下我们究竟在做什么。 您的示例有一个带有此签名的方法:

 public async Task> GetSymbolsAsync() 

返回类型, Task> ,实质上是说“这是一个产生类型IEnumerable的单个结果的方法,它可能不会立即产生该结果。”

我认为单一结果位会让你感到悲伤,因为那并不是你想要的。 Task (无论T是什么)表示单个异步操作。 它可能有许多步骤(如果你将它作为C# async方法实现它,可以使用许多步骤)但最终会产生一件事。 你想在不同的时间产生多个东西,所以Task不适合。

如果你真的要做你的方法签名所承诺的 – 最终产生一个结果 – 你可以做的一种方法是让你的异步方法构建一个列表,然后在它好的时候产生结果:

 // Note: this first example is *not* what you want. // However, it is what your method's signature promises to do. public async Task> GetSymbolsAsync() { var historicalFinancialTask = new List>(); foreach (var symbol in await _listSymbols) { historicalFinancialTask.Add(GetFinancialsQueryAsync(symbol)); } var results = new List(); while (historicalFinancialTask.Count > 0) { var historicalFinancial = await Task.WhenAny(historicalFinancialTask); historicalFinancialTask.Remove(historicalFinancial); results.Add(new Symbol(historicalFinancial.Result.Symbol.Identifier, historicalFinancial.Result.Symbol.HistoricalQuotes, historicalFinancial.Result.Data)); } return results; } 

此方法执行其签名所示:它异步生成一系列符号。

但是大概你想要创建一个IEnumerable来生成可用的项目,而不是等到它们全部可用。 (否则,你也可以只使用WhenAll 。)你可以这样做,但yield return不是那样的。

简而言之,我认为你想要做的是产生一个异步列表。 有一种类型: IObservable完全表达我相信你希望用你的Task> :它是一系列项目(就像IEnumerable )但是异步。

通过类比可能有助于理解它:

 public Symbol GetSymbol() ... 

是的

 public Task GetSymbolAsync() ... 

 public IEnumerable GetSymbols() ... 

是:

 public IObservable GetSymbolsObservable() ... 

(不幸的是,与Task不同,没有一个通用的命名约定来调用异步的面向序列的方法。我在这里添加了’Observable’,但这不是普遍的做法。我当然不会’ t称之为GetSymbolsAsync因为人们会期望返回Task 。)

换句话说, Task>说“当我很好并且准备就绪时我将生成这个集合”,而IObservable说:“这是一个集合。我会在我生产每个项目时”好,准备好了。“

因此,您需要一个返回一系列Symbol对象的方法,其中这些对象是异步生成的。 这告诉我们你应该真正返回一个IObservable 。 这是一个实现:

 // Unlike this first example, this *is* what you want. public IObservable GetSymbolsRx() { return Observable.Create(async obs => { var historicalFinancialTask = new List>(); foreach (var symbol in await _listSymbols) { historicalFinancialTask.Add(GetFinancialsQueryAsync(symbol)); } while (historicalFinancialTask.Count > 0) { var historicalFinancial = await Task.WhenAny(historicalFinancialTask); historicalFinancialTask.Remove(historicalFinancial); obs.OnNext(new Symbol(historicalFinancial.Result.Symbol.Identifier, historicalFinancial.Result.Symbol.HistoricalQuotes, historicalFinancial.Result.Data)); } }); } 

正如您所看到的,这可以让您写出您希望编写的内容 – 此代码的主体几乎与您的相同。 唯一的区别是你使用yield return (它没有编译),这会调用Rx提供的对象上的OnNext方法。

编写完成后,您可以轻松地将其包装在IEnumerable ([ 编辑2013/11/29添加: ]虽然您可能实际上并不想这样做 – 请参阅答案结尾处的添加):

 public IEnumerable GetSymbols() { return GetSymbolsRx().ToEnumerable(); } 

这可能看起来不是异步的,但它实际上允许底层代码异步操作。 当您调用此方法时,它不会阻止 – 即使执行获取财务信息的基础代码不能立即生成结果,此方法仍会立即返回IEnumerable 。 当然,如果数据尚不可用,任何试图遍历该集合的代码都将最终阻塞。 但重要的是,我认为你最初想要实现的目标是:

  • 你可以编写一个执行工作的async方法(我的例子中的委托,作为参数传递给Observable.Create但如果你愿意,你可以写一个独立的async方法)
  • 仅仅因为要求您开始提取符号而不会阻止调用代码
  • 生成的IEnumerable将在可用时立即生成每个单独的项目

这是有效的,因为Rx的ToEnumerable方法中有一些聪明的代码,它弥合了IEnumerable的同步世界视图和异步生成结果之间的差距。 (换句话说,这确实让你失望的是发现C#无法为你做的。)

如果你很好奇,你可以看看来源。 可以在https://rx.codeplex.com/SourceControl/latest#Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GetEnumerator.cs找到代码ToEnumerable基础代码。

[ 编辑2013/11/29添加: ]

svick在评论中指出我错过了一些内容:你的最终目标是将内容放入ObservableCollection 。 不知怎的,我没有看到那一点。 这意味着IEnumerable是错误的方法 – 您希望在项目可用时填充集合,而不是使用foreach循环。 所以你只需这样做:

 GetSymbolsRx().Subscribe(symbol => SymbolsObservableCollection.Add(symbol)); 

或类似的规定。 这将在集合可用时将项目添加到集合中。

这取决于顺便在UI线程上启动的整个事情。 只要它是,你的异步代码应该最终在UI线程上运行,这意味着当项目被添加到集合时,这也发生在UI线程上。 但是如果由于某种原因你最终从工作线程启动了东西(或者如果你在任何等待中使用ConfigureAwait ,从而破坏了与UI线程的连接),你需要安排处理来自Rx的项目右线程上的流:

 GetSymbolsRx() .ObserveOnDispatcher() .Subscribe(symbol => SymbolsObservableCollection.Add(symbol)); 

如果您在执行此操作时处于UI线程中,它将选择当前的调度程序,并确保所有通知都通过它进行。 如果您在订阅时已经使用了错误的线程,则可以使用带有调度程序的ObserveOn重载。 (这些要求您具有对System.Reactive.Windows.Threading的引用。这些是扩展方法,因此您需要using其包含的命名空间,也称为System.Reactive.Windows.Threading

您要求的内容没有多大意义,因为IEnumerable是一个同步接口。 换句话说,如果某个项目尚未可用,则MoveNext()方法必须阻止,它没有其他选择。

你需要的是某种IEnumerable的异步版本。 为此,您可以使用来自Rx的IObservable或来自TPL数据流的 (我最喜欢的)块。 有了它,你的代码可能看起来像这样(我也将一些变量更改为更好的名称):

 public IReceivableSourceBlock GetSymbolsAsync() { var block = new BufferBlock(); GetSymbolsAsyncCore(block).ContinueWith( task => ((IDataflowBlock)block).Fault(task.Exception), TaskContinuationOptions.NotOnRanToCompletion); return block; } private async Task GetSymbolsAsyncCore(ITargetBlock block) { // snip while (historicalFinancialTasks.Count > 0) { var historicalFinancialTask = await Task.WhenAny(historicalFinancialTasks); historicalFinancialTasks.Remove(historicalFinancialTask); var historicalFinancial = historicalFinancialTask.Result; var symbol = new Symbol( historicalFinancial.Symbol.Identifier, historicalFinancial.Symbol.HistoricalQuotes, historicalFinancial.Data); await block.SendAsync(symbol); } } 

用法可能是:

 var symbols = _service.GetSymbolsAsync(); while (await symbols.OutputAvailableAsync()) { Symbol symbol; while (symbols.TryReceive(out symbol)) SymbolsObservableCollection.Add(symbol); } 

要么:

 var symbols = _service.GetSymbolsAsync(); var addToCollectionBlock = new ActionBlock( symbol => SymbolsObservableCollection.Add(symbol)); symbols.LinkTo( addToCollectionBlock, new DataflowLinkOptions { PropagateCompletion = true }); await symbols.Completion; 

我相信你也不能将async方法作为迭代器方法。 这是.NET的限制。 看看使用任务并行库数据流 ,它可用于处理数据,因为它变得可用。 还有Reactive Extensions。

为什么不这样做:

 public async IEnumerable> GetSymbolsAsync() { var historicalFinancialTask = new List>(); foreach (var symbol in await _listSymbols) { historicalFinancialTask.Add(GetFinancialsQueryAsync(symbol)); } while (historicalFinancialTask.Count > 0) { var historicalFinancial = await Task.WhenAny(historicalFinancialTask); historicalFinancialTask.Remove(historicalFinancial); yield return new Symbol(historicalFinancial.Result.Symbol.Identifier, historicalFinancial.Result.Symbol.HistoricalQuotes, historicalFinancial.Result.Data); } }