如何在执行Task.WhenAny时产生返回项
我的解决方案中有两个项目:WPF项目和类库。
在我的class级图书馆:
我有一个符号列表:
class Symbol { Identifier Identifier {get;set;} List HistoricalQuotes {get;set;} List HistoricalFinancials {get;set;} }
对于每个符号,我查询金融服务,以使用webrequest检索每个符号的历史财务数据。 (webClient.DownloadStringTaskAsync(URI))
所以这是我的方法:
public async Task<IEnumerable> GetSymbolsAsync() { var historicalFinancialTask = new List<Task>(); foreach (var symbol in await _listSymbols) { historicalFinancialTask.Add(GetFinancialsQueryAsync(symbol)); } while (historicalFinancialTask.Count > 0) { var historicalFinancial = await Task.WhenAny(historicalFinancialTask); historicalFinancialTask.Remove(historicalFinancial); // the line below doesn't compile, which is understandable because method's return type is a Task of something yield return new Symbol(historicalFinancial.Result.Symbol.Identifier, historicalFinancial.Result.Symbol.HistoricalQuotes, historicalFinancial.Result.Data); } } private async Task GetFinancialsQueryAsync(Symbol symbol) { var result = new HistoricalFinancialResult(); result.Symbol = symbol; result.Data = await _financialsQuery.GetFinancialsQuery(symbol.Identifier); // contains some logic like parsing and use WebClient to query asynchronously return result; } private class HistoricalFinancialResult { public Symbol Symbol { get; set; } public IEnumerable Data { get; set; } // equality members }
正如您所看到的,我希望每次下载每个符号的财务历史数据时,都会产生结果,而不是等待我对金融服务的所有调用完成。
在我的WPF中,这就是我想做的事情:
foreach(var symbol in await _service.GetSymbolsAsync()) { SymbolsObservableCollection.Add(symbol); }
看来我们不能在异步方法中产生回报,那么我可以使用什么解决方案? 除了将我的GetSymbols方法移动到我的WPF项目中。
虽然我喜欢TPL Dataflow组件(svick建议您使用),但转移到该系统确实需要大量的承诺 – 这不是您可以添加到现有设计中的东西。 如果您正在执行大量CPU密集型数据处理并希望利用许多CPU内核,它可以提供相当大的好处。 但是,充分利用它是非常重要的。
他使用Rx的另一个建议可能更容易与现有解决方案集成。 (请参阅原始文档 ,但是对于最新的代码,请使用Rx-Main nuget包。或者,如果您想查看源代码,请参阅Rx CodePlex站点 )甚至可以继续调用代码如果你愿意,可以使用IEnumerable
– 你可以将Rx纯粹用作实现细节,[ 编辑2013/11/09添加: ]虽然正如svick指出的那样,鉴于你的最终目标,这可能不是一个好主意。
在我向您展示一个例子之前,我想清楚一下我们究竟在做什么。 您的示例有一个带有此签名的方法:
public async Task> GetSymbolsAsync()
返回类型, Task
,实质上是说“这是一个产生类型IEnumerable
的单个结果的方法,它可能不会立即产生该结果。”
我认为单一结果位会让你感到悲伤,因为那并不是你想要的。 Task
(无论T
是什么)表示单个异步操作。 它可能有许多步骤(如果你将它作为C# async
方法实现它,可以使用许多步骤)但最终会产生一件事。 你想在不同的时间产生多个东西,所以Task
不适合。
如果你真的要做你的方法签名所承诺的 – 最终产生一个结果 – 你可以做的一种方法是让你的异步方法构建一个列表,然后在它好的时候产生结果:
// Note: this first example is *not* what you want. // However, it is what your method's signature promises to do. public async Task> GetSymbolsAsync() { var historicalFinancialTask = new List>(); foreach (var symbol in await _listSymbols) { historicalFinancialTask.Add(GetFinancialsQueryAsync(symbol)); } var results = new List(); while (historicalFinancialTask.Count > 0) { var historicalFinancial = await Task.WhenAny(historicalFinancialTask); historicalFinancialTask.Remove(historicalFinancial); results.Add(new Symbol(historicalFinancial.Result.Symbol.Identifier, historicalFinancial.Result.Symbol.HistoricalQuotes, historicalFinancial.Result.Data)); } return results; }
此方法执行其签名所示:它异步生成一系列符号。
但是大概你想要创建一个IEnumerable
来生成可用的项目,而不是等到它们全部可用。 (否则,你也可以只使用WhenAll
。)你可以这样做,但yield return
不是那样的。
简而言之,我认为你想要做的是产生一个异步列表。 有一种类型: IObservable
完全表达我相信你希望用你的Task
:它是一系列项目(就像IEnumerable
)但是异步。
通过类比可能有助于理解它:
public Symbol GetSymbol() ...
是的
public Task GetSymbolAsync() ...
如
public IEnumerable GetSymbols() ...
是:
public IObservable GetSymbolsObservable() ...
(不幸的是,与Task
不同,没有一个通用的命名约定来调用异步的面向序列的方法。我在这里添加了’Observable’,但这不是普遍的做法。我当然不会’ t称之为GetSymbolsAsync
因为人们会期望返回Task
。)
换句话说, Task
说“当我很好并且准备就绪时我将生成这个集合”,而IObservable
说:“这是一个集合。我会在我生产每个项目时”好,准备好了。“
因此,您需要一个返回一系列Symbol
对象的方法,其中这些对象是异步生成的。 这告诉我们你应该真正返回一个IObservable
。 这是一个实现:
// Unlike this first example, this *is* what you want. public IObservable GetSymbolsRx() { return Observable.Create (async obs => { var historicalFinancialTask = new List>(); foreach (var symbol in await _listSymbols) { historicalFinancialTask.Add(GetFinancialsQueryAsync(symbol)); } while (historicalFinancialTask.Count > 0) { var historicalFinancial = await Task.WhenAny(historicalFinancialTask); historicalFinancialTask.Remove(historicalFinancial); obs.OnNext(new Symbol(historicalFinancial.Result.Symbol.Identifier, historicalFinancial.Result.Symbol.HistoricalQuotes, historicalFinancial.Result.Data)); } }); }
正如您所看到的,这可以让您写出您希望编写的内容 – 此代码的主体几乎与您的相同。 唯一的区别是你使用yield return
(它没有编译),这会调用Rx提供的对象上的OnNext
方法。
编写完成后,您可以轻松地将其包装在IEnumerable
([ 编辑2013/11/29添加: ]虽然您可能实际上并不想这样做 – 请参阅答案结尾处的添加):
public IEnumerable GetSymbols() { return GetSymbolsRx().ToEnumerable(); }
这可能看起来不是异步的,但它实际上允许底层代码异步操作。 当您调用此方法时,它不会阻止 – 即使执行获取财务信息的基础代码不能立即生成结果,此方法仍会立即返回IEnumerable
。 当然,如果数据尚不可用,任何试图遍历该集合的代码都将最终阻塞。 但重要的是,我认为你最初想要实现的目标是:
- 你可以编写一个执行工作的
async
方法(我的例子中的委托,作为参数传递给Observable.Create
但如果你愿意,你可以写一个独立的async
方法) - 仅仅因为要求您开始提取符号而不会阻止调用代码
- 生成的
IEnumerable
将在可用时立即生成每个单独的项目
这是有效的,因为Rx的ToEnumerable
方法中有一些聪明的代码,它弥合了IEnumerable
的同步世界视图和异步生成结果之间的差距。 (换句话说,这确实让你失望的是发现C#无法为你做的。)
如果你很好奇,你可以看看来源。 可以在https://rx.codeplex.com/SourceControl/latest#Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GetEnumerator.cs找到代码ToEnumerable
基础代码。
[ 编辑2013/11/29添加: ]
svick在评论中指出我错过了一些内容:你的最终目标是将内容放入ObservableCollection
。 不知怎的,我没有看到那一点。 这意味着IEnumerable
是错误的方法 – 您希望在项目可用时填充集合,而不是使用foreach
循环。 所以你只需这样做:
GetSymbolsRx().Subscribe(symbol => SymbolsObservableCollection.Add(symbol));
或类似的规定。 这将在集合可用时将项目添加到集合中。
这取决于顺便在UI线程上启动的整个事情。 只要它是,你的异步代码应该最终在UI线程上运行,这意味着当项目被添加到集合时,这也发生在UI线程上。 但是如果由于某种原因你最终从工作线程启动了东西(或者如果你在任何等待中使用ConfigureAwait
,从而破坏了与UI线程的连接),你需要安排处理来自Rx的项目右线程上的流:
GetSymbolsRx() .ObserveOnDispatcher() .Subscribe(symbol => SymbolsObservableCollection.Add(symbol));
如果您在执行此操作时处于UI线程中,它将选择当前的调度程序,并确保所有通知都通过它进行。 如果您在订阅时已经使用了错误的线程,则可以使用带有调度程序的ObserveOn
重载。 (这些要求您具有对System.Reactive.Windows.Threading
的引用。这些是扩展方法,因此您需要using
其包含的命名空间,也称为System.Reactive.Windows.Threading
)
您要求的内容没有多大意义,因为IEnumerable
是一个同步接口。 换句话说,如果某个项目尚未可用,则MoveNext()
方法必须阻止,它没有其他选择。
你需要的是某种IEnumerable
的异步版本。 为此,您可以使用来自Rx的IObservable
或来自TPL数据流的 (我最喜欢的)块。 有了它,你的代码可能看起来像这样(我也将一些变量更改为更好的名称):
public IReceivableSourceBlock GetSymbolsAsync() { var block = new BufferBlock (); GetSymbolsAsyncCore(block).ContinueWith( task => ((IDataflowBlock)block).Fault(task.Exception), TaskContinuationOptions.NotOnRanToCompletion); return block; } private async Task GetSymbolsAsyncCore(ITargetBlock block) { // snip while (historicalFinancialTasks.Count > 0) { var historicalFinancialTask = await Task.WhenAny(historicalFinancialTasks); historicalFinancialTasks.Remove(historicalFinancialTask); var historicalFinancial = historicalFinancialTask.Result; var symbol = new Symbol( historicalFinancial.Symbol.Identifier, historicalFinancial.Symbol.HistoricalQuotes, historicalFinancial.Data); await block.SendAsync(symbol); } }
用法可能是:
var symbols = _service.GetSymbolsAsync(); while (await symbols.OutputAvailableAsync()) { Symbol symbol; while (symbols.TryReceive(out symbol)) SymbolsObservableCollection.Add(symbol); }
要么:
var symbols = _service.GetSymbolsAsync(); var addToCollectionBlock = new ActionBlock( symbol => SymbolsObservableCollection.Add(symbol)); symbols.LinkTo( addToCollectionBlock, new DataflowLinkOptions { PropagateCompletion = true }); await symbols.Completion;
我相信你也不能将async
方法作为迭代器方法。 这是.NET的限制。 看看使用任务并行库数据流 ,它可用于处理数据,因为它变得可用。 还有Reactive Extensions。
为什么不这样做:
public async IEnumerable> GetSymbolsAsync() { var historicalFinancialTask = new List>(); foreach (var symbol in await _listSymbols) { historicalFinancialTask.Add(GetFinancialsQueryAsync(symbol)); } while (historicalFinancialTask.Count > 0) { var historicalFinancial = await Task.WhenAny(historicalFinancialTask); historicalFinancialTask.Remove(historicalFinancial); yield return new Symbol(historicalFinancial.Result.Symbol.Identifier, historicalFinancial.Result.Symbol.HistoricalQuotes, historicalFinancial.Result.Data); } }