Rx框架:在超时时执行操作,而不会中断原始的可观察序列

给定一个可观察的源,通过轮询(更改a)低级设备的状态生成…

// observable source metacode: IObservable source = Observable.Interval(TimeSpan.FromSeconds(0.5)) .Select(tick => new DeviceState(_device.ReadValue())) .DistinctUntilChanged(); 

……以及更新UI的消费者……

 // UI metacode: service.GetObservableDeviceStates() .Subscribe(state => viewModel.CurrentState = state.ToString()); 

…我需要在x秒的源“不活动”后执行自定义操作,而不会中断对源的订阅。 像这样的东西:

 // UI metacode: service.GetObservableDeviceStates() .DoOnTimeout(TimeSpan.FromSeconds(x), () => viewModel.CurrentState = "Idle") .Subscribe(state => viewModel.CurrentState = state.ToString()); 

什么是最佳做法? 想到的可能解决方案是(我是Rx noob):

  1. 缓冲区 (即使它不那么可读)
  2. 玩这个超时超载 ;
  3. 当没有任何变化(而不是使用DistinctUntilChanged)并在UI代码上处理它时,返回一些特殊的“服务端”:

    service.GetObservableDeviceStates()。Subscribe(state => viewModel.CurrentState = state.Special?“Idle”:state.ToString());

编辑:如答案中所述 ,解决方案是:

  service.GetObservableDeviceStates() .Do(onNext) .Throttle(TimeSpan.FromSeconds(x)) .Subscribe(onTimeout); 

EDIT2(警告)

如果onNext和onTimeout更新UI组件,为了避免CrossThreadExceptions需要两个 ObserveOn(uiSynchronizationContext),因为Throttle在另一个线程上工作!

  service.GetObservableDeviceStates() .ObserveOn(uiSynchronizationContext) .Do(onNext) .Throttle(TimeSpan.FromSeconds(x)) .ObserveOn(uiSynchronizationContext) .Subscribe(onTimeout); 

超时或多或少意味着代表单个异步操作的可观察对象 – 例如,如果所述observable在一定时间内没有通知您,则返回默认值或OnError

您正在寻找的运营商是Throttle ,尽管起初可能看起来不像。 Throttle(p)为您提供一个流,当源流未生成句点p的值时,该流生成一个值。

与现有代码并行,您可以使用source.Throttle(period).Do(...side effect)

我个人会为此避免使用Do方法。 它确实使这个例子中的代码相当容易,但我发现一旦使用’Do’潜入代码库,你很快就会有意大利面。

您还可以考虑使用Amb,Timer,TakeUntil,Throttle等组合来获得您正在寻找的结果并仍然保持Monad *。 或者简单来说,我假设您理想情况下希望有一系列状态值通过而不需要在您的代码中放置一个计时器(即将其卸载到服务中)。

 public IObservable GetObservableDeviceStates(TimeSpan silencePeriod) { return Observable.Create( o=> { var idle = Observable.Timer(silencePeriod).Select(_=>new DeviceStatus("Idle")); var polledStatus = Observable.Interval(TimeSpan.FromSeconds(0.5)) .Select(tick => new DeviceStatus(_device.ReadValue())) .DistinctUntilChanged() .Publish(); var subscription = (from status in polledStatus from cont in Observable.Return(status).Concat(idle.TakeUntil(polledStatus)) select cont) .Subscribe(o); return new CompositeDisposable(subscription, polledStatus.Connect()); }); } 

此代码现在使服务在指定的更改静默时间段后返回空闲状态值。

这意味着您的UI元代码保持简单,与DeviceStatus相关的逻辑保持在它所属的位置

 // UI metacode: service.GetObservableDeviceStates(TimeSpan.FromSeconds(2)) .Subscribe(state => viewModel.CurrentState = state.ToString());