有没有办法将观察者订阅为异步
给定一个同步观察者,有没有办法做到这一点:
observable.SubscribeAsync(observer);
并且observer
所有方法都是异步调用的,或者是在创建观察者时我必须处理的东西?
您可能需要查看ObserveOn
和SubscribeOn
( 更多信息和更多信息 )。
如果您需要在流输出新值时调用异步方法,则最常见的解决方案是使用SelectMany
。 问题是,这不会等待方法完成,导致SelectMany
创建的任何任务并行运行。
如果要在等待异步函数完成时阻塞流,则需要以下内容:
Observable.Interval(TimeSpan.FromSeconds(1)) .Select(l => Observable.FromAsync(asyncMethod)) .Concat() .Subscribe();
要么:
Observable.Interval(TimeSpan.FromSeconds(1)) .Select(_ => Observable.Defer(() => asyncMethod().ToObservable())) .Concat() .Subscribe();
如果通过让异步调用观察者的方法,你的意思是你想要一个可以发布新通知但不等待处理完的先前通知的情况,那么你必须自己做这件事。 这违反了Rx的合同,因为如果您可以同时在飞行中有多个通知,则无法再保证按顺序处理通知。 我认为这种方法还有其他问题 – 这是你要小心的事情。
另一方面,如果您只想处理与创建通知的线程不同的线程上的通知,那么ObserveOn和SubscribeOn就是您要查看的内容。