当没有值传入时,是否有Rx方法定期重复上一个值?

我遇到的一个用例,我怀疑我不能成为唯一一个用例,如下所示:

IObservable Observable.RepeatLastValueDuringSilence(this IObservable inner, TimeSpan maxQuietPeriod); 

它将返回内部observable中的所有未来项,但是,如果内部observable在一段时间内没有调用OnNext(maxQuietPeriod),它只重复最后一个值(当然内部调用OnCompleted或OnError) 。

理由是服务定期ping出定期状态更新。 例如:

 var myStatus = Observable.FromEvent( h=>this.StatusUpdate+=h, h=>this.StatusUpdate-=h); var messageBusStatusPinger = myStatus .RepeatLastValueDuringSilence(TimeSpan.FromSeconds(1)) .Subscribe(update => _messageBus.Send(update)); 

这样的事情存在吗? 还是我过度估计它的用处?

谢谢,Alex

PS:我为任何不正确的术语/语法道歉,因为我只是第一次探索Rx。

类似于Matthew的解决方案,但是这里定时器在源中接收到每个元素之后开始,我认为这更正确(但差异不太重要):

 public static IObservable RepeatLastValueDuringSilence(this IObservable inner, TimeSpan maxQuietPeriod) { return inner.Select(x => Observable.Interval(maxQuietPeriod) .Select(_ => x) .StartWith(x) ).Switch(); } 

而且测试:

 var source = Observable.Interval(TimeSpan.FromMilliseconds(100)).Take(5).Select(_ => "1") .Concat(Observable.Interval(TimeSpan.FromSeconds(1)).Take(5).Select(_ => "2")) .Concat(Observable.Interval(TimeSpan.FromMilliseconds(100)).Take(5).Select(_ => "3")); source.RepeatLastValueDuringSilence(TimeSpan.FromMilliseconds(200)).Subscribe(Console.WriteLine); 

您应该看到1次打印10次(5次来自源,5次在沉默期间重复),然后很多2从源头获得一次,另外4次从每次之间的静音,然后是无限3

这个相当简单的查询可以完成工作:

 var query = source .Select(s => Observable .Interval(TimeSpan.FromSeconds(1.0)) .StartWith(s) .Select(x => s)) .Switch(); 

永远不要低估.Switch()的力量。

我认为这样做你想要的,如果你的观察不热,你需要PublishRefcount它:

 public static IObservable RepeatLastValueDuringSilence(this IObservable inner, TimeSpan maxQuietPeriod) { var throttled = inner.Throttle(maxQuietPeriod); var repeating = throttled.SelectMany(i => Observable .Interval(maxQuietPeriod) .Select(_ => i) .TakeUntil(inner)); return Observable.Merge(inner, throttled, repeating); } 

Rx库中没有方法,但我也需要这样的方法。 在我的用例中,即使源没有输出任何值,我也需要输出值。 如果您不希望在第一个源值出现之前输出任何值,则可以在订阅调用之前删除defaultValue参数和对createTimer()的调用。

需要调度程序来运行计时器。 一个明显的重载是不需要调度程序并选择默认调度程序(我使用ThreadPool调度程序)。

 Imports System.Reactive Imports System.Reactive.Concurrency Imports System.Reactive.Disposables Imports System.Reactive.Linq  Public Function AtLeastEvery(Of T)(source As IObservable(Of T), timeout As TimeSpan, defaultValue As T, scheduler As IScheduler ) As IObservable(Of T) If source Is Nothing Then Throw New ArgumentNullException("source") If scheduler Is Nothing Then Throw New ArgumentNullException("scheduler") Return Observable.Create( Function(observer As IObserver(Of T)) Dim id As ULong Dim gate As New Object() Dim timer As New SerialDisposable() Dim lastValue As T = defaultValue Dim createTimer As Action = Sub() Dim startId As ULong = id timer.Disposable = scheduler.Schedule(timeout, Sub(self As Action(Of TimeSpan)) Dim noChange As Boolean SyncLock gate noChange = (id = startId) If noChange Then observer.OnNext(lastValue) End If End SyncLock 'only restart if no change, otherwise 'the change restarted the timeout If noChange Then self(timeout) End Sub) End Sub 'start the first timeout createTimer() 'subscribe to the source observable Dim subscription = source.Subscribe( Sub(v) SyncLock gate id += 1UL lastValue = v End SyncLock observer.OnNext(v) createTimer() 'reset the timeout End Sub, Sub(ex) SyncLock gate id += 1UL End SyncLock observer.OnError(ex) 'do not reset the timeout, because the sequence has ended End Sub, Sub() SyncLock gate id += 1UL End SyncLock observer.OnCompleted() 'do not reset the timeout, because the sequence has ended End Sub) Return New CompositeDisposable(timer, subscription) End Function) End Function