如何从rx订阅回调异步function?
我想在Rx订阅中回调一个异步函数。
就像这样:
public class Consumer { private readonly Service _service = new Service(); public ReplaySubject Results = new ReplaySubject(); public void Trigger() { Observable.Timer(TimeSpan.FromMilliseconds(100)).Subscribe(async _ => await RunAsync()); } public Task RunAsync() { return _service.DoAsync(); } } public class Service { public async Task DoAsync() { return await Task.Run(() => Do()); } private static string Do() { Thread.Sleep(TimeSpan.FromMilliseconds(200)); throw new ArgumentException("invalid!"); return "foobar"; } } [Test] public async Task Test() { var sut = new Consumer(); sut.Trigger(); var result = await sut.Results.FirstAsync(); }
需要做什么才能正确捕捉exception?
您不希望将async
方法传递给Subscribe
,因为这将创建一个async void
方法。 尽力避免async void
。
在您的情况下,我认为您想要的是为序列的每个元素调用async
方法,然后缓存所有结果。 在这种情况下,使用SelectMany
为每个元素调用async
方法,并使用Replay
进行缓存(加上一个Connect
以获得滚动):
public class Consumer { private readonly Service _service = new Service(); public IObservable Trigger() { var connectible = Observable.Timer(TimeSpan.FromMilliseconds(100)) .SelectMany(_ => RunAsync()) .Replay(); connectible.Connect(); return connectible; } public Task RunAsync() { return _service.DoAsync(); } }
我将Results
属性更改为从Trigger
方法返回,我认为这更有意义,因此测试现在看起来像:
[Test] public async Task Test() { var sut = new Consumer(); var results = sut.Trigger(); var result = await results.FirstAsync(); }
Paul Betts的回答适用于大多数情况,但是如果你想在等待异步函数完成时阻塞流,你需要这样的东西:
Observable.Interval(TimeSpan.FromSeconds(1)) .Select(l => Observable.FromAsync(asyncMethod)) .Concat() .Subscribe();
要么:
Observable.Interval(TimeSpan.FromSeconds(1)) .Select(_ => Observable.Defer(() => asyncMethod().ToObservable())) .Concat() .Subscribe();
将此更改为:
Observable.Timer(TimeSpan.FromMilliseconds(100)) .SelectMany(async _ => await RunAsync()) .Subscribe();
订阅不会将异步操作保留在Observable中。