

IObservable Observable.RepeatLastValueDuringSilence(this IObservable inner, TimeSpan maxQuietPeriod); 

它将返回内部observable中的所有未来项,但是,如果内部observable在一段时间内没有调用OnNext(maxQuietPeriod),它只重复最后一个值(当然内部调用OnCompleted或OnError) 。

理由是服务定期ping出定期状态更新。 例如:

 var myStatus = Observable.FromEvent( h=>this.StatusUpdate+=h, h=>this.StatusUpdate-=h); var messageBusStatusPinger = myStatus .RepeatLastValueDuringSilence(TimeSpan.FromSeconds(1)) .Subscribe(update => _messageBus.Send(update)); 

这样的事情存在吗? 还是我过度估计它的用处?




 public static IObservable RepeatLastValueDuringSilence(this IObservable inner, TimeSpan maxQuietPeriod) { return inner.Select(x => Observable.Interval(maxQuietPeriod) .Select(_ => x) .StartWith(x) ).Switch(); } 


 var source = Observable.Interval(TimeSpan.FromMilliseconds(100)).Take(5).Select(_ => "1") .Concat(Observable.Interval(TimeSpan.FromSeconds(1)).Take(5).Select(_ => "2")) .Concat(Observable.Interval(TimeSpan.FromMilliseconds(100)).Take(5).Select(_ => "3")); source.RepeatLastValueDuringSilence(TimeSpan.FromMilliseconds(200)).Subscribe(Console.WriteLine); 



 var query = source .Select(s => Observable .Interval(TimeSpan.FromSeconds(1.0)) .StartWith(s) .Select(x => s)) .Switch(); 



 public static IObservable RepeatLastValueDuringSilence(this IObservable inner, TimeSpan maxQuietPeriod) { var throttled = inner.Throttle(maxQuietPeriod); var repeating = throttled.SelectMany(i => Observable .Interval(maxQuietPeriod) .Select(_ => i) .TakeUntil(inner)); return Observable.Merge(inner, throttled, repeating); } 

Rx库中没有方法,但我也需要这样的方法。 在我的用例中,即使源没有输出任何值,我也需要输出值。 如果您不希望在第一个源值出现之前输出任何值,则可以在订阅调用之前删除defaultValue参数和对createTimer()的调用。

需要调度程序来运行计时器。 一个明显的重载是不需要调度程序并选择默认调度程序(我使用ThreadPool调度程序)。

 Imports System.Reactive Imports System.Reactive.Concurrency Imports System.Reactive.Disposables Imports System.Reactive.Linq  Public Function AtLeastEvery(Of T)(source As IObservable(Of T), timeout As TimeSpan, defaultValue As T, scheduler As IScheduler ) As IObservable(Of T) If source Is Nothing Then Throw New ArgumentNullException("source") If scheduler Is Nothing Then Throw New ArgumentNullException("scheduler") Return Observable.Create( Function(observer As IObserver(Of T)) Dim id As ULong Dim gate As New Object() Dim timer As New SerialDisposable() Dim lastValue As T = defaultValue Dim createTimer As Action = Sub() Dim startId As ULong = id timer.Disposable = scheduler.Schedule(timeout, Sub(self As Action(Of TimeSpan)) Dim noChange As Boolean SyncLock gate noChange = (id = startId) If noChange Then observer.OnNext(lastValue) End If End SyncLock 'only restart if no change, otherwise 'the change restarted the timeout If noChange Then self(timeout) End Sub) End Sub 'start the first timeout createTimer() 'subscribe to the source observable Dim subscription = source.Subscribe( Sub(v) SyncLock gate id += 1UL lastValue = v End SyncLock observer.OnNext(v) createTimer() 'reset the timeout End Sub, Sub(ex) SyncLock gate id += 1UL End SyncLock observer.OnError(ex) 'do not reset the timeout, because the sequence has ended End Sub, Sub() SyncLock gate id += 1UL End SyncLock observer.OnCompleted() 'do not reset the timeout, because the sequence has ended End Sub) Return New CompositeDisposable(timer, subscription) End Function) End Function