如何从rx订阅回调异步function?

我想在Rx订阅中回调一个异步函数。

就像这样:

public class Consumer { private readonly Service _service = new Service(); public ReplaySubject Results = new ReplaySubject(); public void Trigger() { Observable.Timer(TimeSpan.FromMilliseconds(100)).Subscribe(async _ => await RunAsync()); } public Task RunAsync() { return _service.DoAsync(); } } public class Service { public async Task DoAsync() { return await Task.Run(() => Do()); } private static string Do() { Thread.Sleep(TimeSpan.FromMilliseconds(200)); throw new ArgumentException("invalid!"); return "foobar"; } } [Test] public async Task Test() { var sut = new Consumer(); sut.Trigger(); var result = await sut.Results.FirstAsync(); } 

需要做什么才能正确捕捉exception?

您不希望将async方法传递给Subscribe ,因为这将创建一个async void方法。 尽力避免async void

在您的情况下,我认为您想要的是为序列的每个元素调用async方法,然后缓存所有结果。 在这种情况下,使用SelectMany为每个元素调用async方法,并使用Replay进行缓存(加上一个Connect以获得滚动):

 public class Consumer { private readonly Service _service = new Service(); public IObservable Trigger() { var connectible = Observable.Timer(TimeSpan.FromMilliseconds(100)) .SelectMany(_ => RunAsync()) .Replay(); connectible.Connect(); return connectible; } public Task RunAsync() { return _service.DoAsync(); } } 

我将Results属性更改为从Trigger方法返回,我认为这更有意义,因此测试现在看起来像:

 [Test] public async Task Test() { var sut = new Consumer(); var results = sut.Trigger(); var result = await results.FirstAsync(); } 

Paul Betts的回答适用于大多数情况,但是如果你想在等待异步函数完成时阻塞流,你需要这样的东西:

 Observable.Interval(TimeSpan.FromSeconds(1)) .Select(l => Observable.FromAsync(asyncMethod)) .Concat() .Subscribe(); 

要么:

 Observable.Interval(TimeSpan.FromSeconds(1)) .Select(_ => Observable.Defer(() => asyncMethod().ToObservable())) .Concat() .Subscribe(); 

将此更改为:

 Observable.Timer(TimeSpan.FromMilliseconds(100)) .SelectMany(async _ => await RunAsync()) .Subscribe(); 

订阅不会将异步操作保留在Observable中。