Reactive Extensions(Rx) – 当间隔中没有值时,具有最后已知值的样本

我有一个可观察的流,以不一致的间隔生成值,如下所示:

------1---2------3----------------4--------------5--- 

我希望对此进行采样,但是一旦产生了一个值,就没有任何空样本:

 ------1---2------3----------------4--------------5----- ----_----1----2----3----3----3----4----4----4----5----5 

我显然认为Replay().RefCount()可以在这里用来向Sample()提供最后一个已知的值,但由于它没有重新订阅源流,所以它没有用完。

有关如何做到这一点的任何想法?

假设你的源流是IObservable xs然后你的采样间隔是Timespan duration那么:

 xs.Publish(ps => Observable.Interval(duration) .Zip(ps.MostRecent(0), (x,y) => y) .SkipUntil(ps)) 

对于通用解决方案,使用default(T)0参数替换为MostRecent ,其中IObservable是源流类型。

Publish的目的是防止订阅副作用,因为我们需要两次订阅源 – 一次用于MostRecent ,一次用于SkipUntil 。 后者的目的是防止采样值直到源流的第一个事件。

如果您不关心在源流的第一个事件之前获取默认值,则可以简化此操作:

 Observable.Interval(duration) .Zip(xs.MostRecent(0), (x,y) => y) 

相关运营商WithLatestFrom也可能是有意义的; 这是在下一个版本中来到Rx。 详情请见此处 。