Reactive Extensions订阅呼叫等待

我想基于Reactive Extensions Observable引发的每个事件执行异步调用。 我也试图让所有内容保持同步,因为我希望在处理下一个事件之前完成异步调用。

怎么会去做类似以下的事情呢? 我说类似下面的代码不编译。

settingsChangedInMemory .Subscribe(async _ => { var settings = Extract(); await SaveSettings(settings); }); 

我不确定它是否会改变任何东西,但我需要订阅多个Observable。 例如,像这样的另一个订阅。

 settingsChangedOnDisk .Subscribe(async _ => { var settings = await ReadSettings(settings); Apply(settings); }); 

您将如何使用Reactive Extensions执行此操作?

怎么样:

 settingsChangedInMemory .SelectMany(async _ => await SaveSettings(Extract())) .Subscribe(x => Apply(x)); 

永远不要在Subscribe async ,您总是希望将它放在SelectMany

您可以使用Reactive Extensions(Rx)2.0中发布的新ForEachAsync方法,如下所示:

 await observable .ForEachAsync(async x => { Console.WriteLine(x); await Task.Delay(1000); }); 

ForEachAsync返回一个Task ,它在observable完成时完成。 我的博客文章中的更多信息或react native扩展团队的此博客文章 。