Rx Throttle(…)。ObserveOn(调度程序)和Throttle(…,调度程序)之间的区别

我有以下代码:

IDisposable subscription = myObservable.Throttle(TimeSpan.FromMilliseconds(50), RxApp.MainThreadScheduler) .Subscribe(_ => UpdateUi()); 

正如所料, UpdateUi()将始终在主线程上执行。 当我将代码更改为

 IDisposable subscription = myObservable.Throttle(TimeSpan.FromMilliseconds(50)) .ObserveOn(RxApp.MainThreadScheduler) .Subscribe(_ => UpdateUi()); 

UpdateUI()将在后台线程中执行。

为什么不是Throttle(...).ObserveOn(scheduler)等同于Throttle(..., scheduler)

经过一番调查后,我认为这是由于运行时使用的Rx版本比我预期的不同(我为第三方应用程序开发了一个插件)。

我不确定为什么,但似乎默认的RxApp.MainThreadScheduler无法正确初始化。 默认实例是WaitForDispatcherScheduler ( source )。 此类中的所有函数都依赖attemptToCreateScheduler

  IScheduler attemptToCreateScheduler() { if (_innerScheduler != null) return _innerScheduler; try { _innerScheduler = _schedulerFactory(); return _innerScheduler; } catch (Exception) { // NB: Dispatcher's not ready yet. Keep using CurrentThread return CurrentThreadScheduler.Instance; } } 

在我的情况下似乎发生的是_schedulerFactory()抛出,导致返回CurrentThreadScheduler.Instance

通过手动将RxApp.MainThreadScheduler初始化为new SynchronizationContextScheduler(SynchronizationContext.Current)行为是预期的。

在代码中的两个示例中,您总是会在RxApp.MainThreadScheduler指定的调度程序上调用RxApp.MainThreadScheduler 。 我可以肯定地说这一点,因为ObserveOn是一个装饰器,可以确保在指定的调度程序上调用订阅者的OnNext处理程序。 请参阅此处进行深入分析。

所以说,这有点令人费解。 RxApp.MainThreadScheduler没有引用正确的调度程序调度程序,或者UpdateUi正在转移调度程序线程。 前者不是前所未有的 – 请参阅https://github.com/reactiveui/ReactiveUI/issues/768其他人遇到此问题。 我不知道那个案子是什么问题。 也许@PaulBetts可以权衡,或者您可以在https://github.com/reactiveui/上提出问题。 无论如何,我会仔细检查你的假设,因为我希望这是一个经过良好测试的区域。 你有完整的复制品吗?

至于您的具体问题, Throttle(...).ObserveOn(scheduler)Throttle(..., scheduler)之间的区别如下:

在第一种情况下,当没有调度程序指定Throttle时,它将使用默认平台调度程序来引入运行它的计时器所需的并发性 – 在WPF上,这将使用线程池线程。 因此,所有限制都将在后台线程上完成,并且由于以下ObserveOn ,已释放的事件将仅传递给指定调度程序上的订户。

Throttle指定调度程序的情况下,在该调度程序上完成限制 – 将在该调度程序上管理被抑制事件和已释放事件,并且也将在同一调度程序上调用订阅者。

无论哪种方式, UpdateUi将在UpdateUi上调用RxApp.MainThreadScheduler

在大多数情况下,最好在调度程序上限制ui事件,因为在后台线程上运行单独的计时器通常成本更高,如果只有一小部分事件要通过限制,则支付上下文切换。

因此,为了检查您是否遇到过RxApp.MainThreadScheduler的问题,我会尝试通过其他方法明确指定调度程序或SynchronizationContext 。 如何执行此操作取决于您所在的平台 – ObserveOnDispatcher()有望可用,或使用合适的ObserveOn重载。 在导入正确的Rx库的情况下,可以选择控件,同步转换和调度程序。