Rx IObservable缓冲以平滑突发事件

我有一个Observable序列,可以快速突发产生事件(即:一个接一个地发生五个事件,然后是长时间延迟,然后是另一个快速突发事件等)。 我希望通过在事件之间插入一个短暂的延迟来平滑这些突发。 想象一下以下图表作为示例:

 Raw:--oooo -------------- ooooo ----- oo ---------------- ooo |
缓冲: -  o  -  o  -  o  -  o -------- o  -  o  -  o  -  o  -  o  -  o  -  o --------- Ø - ○ - ○|

我目前的方法是通过Observable.Interval()生成类似节拍器的计时器,该计时器表示何时可以从原始流中拉出另一个事件。 问题是我无法弄清楚如何将该计时器与我的原始无缓冲可观察序列相结合。

IObservable.Zip()接近于我想做的事情,但它只有在原始流比定时器生成事件更快的情况下才有效。 一旦原始流中存在显着的间歇,计时器就会建立一系列不需要的事件,然后立即与原始流中的下一个事件突发事件配对。

理想情况下,我想要一个具有以下函数签名的IObservable扩展方法,该方法生成我上面概述的bevaior。 现在,来救我的StackOverflow 🙂

 public static IObservable Buffered(this IObservable src, TimeSpan minDelay) 

PS。 我是Rx的新手,所以我很抱歉这是一个简单的问题……


1.简单而有缺陷的方法

这是我最初的天真和简单的解决方案,它有很多问题:

 public static IObservable Buffered(this IObservable source, TimeSpan minDelay) { Queue q = new Queue(); source.Subscribe(x => q.Enqueue(x)); return Observable.Interval(minDelay).Where(_ => q.Count > 0).Select(_ => q.Dequeue()); } 

第一个明显的问题是内部订阅返回到原始源的IDisposable丢失,因此订阅无法终止。 在此方法返回的IDisposable上调用Dispose会终止计时器,但不会触发现在不必要地填充队列的基础原始事件源,而没有人留下来从队列中提取事件。

第二个问题是,从原始事件流到缓冲流,无法通过exception或流末尾通知传播 – 在订阅原始源时,它们将被忽略。

最后但并非最不重要的是,现在我已经有了定期唤醒的代码,无论是否有任何工作要做,我宁愿避免在这个美妙的新反应世界中。


2.过于复杂的方法

为了解决我最初的简单方法中遇到的问题,我编写了一个更复杂的函数,其行为与IObservable.Delay()非常相似(我使用.NET Reflector读取该代码并将其用作我函数的基础)。 不幸的是,许多样板逻辑如AnonymousObservable在system.reactive代码之外是不可公开访问的,所以我不得不复制并粘贴大量代码。 这个解决方案似乎有效,但鉴于其复杂性,我对其无bug没有信心。

我无法相信没有办法使用标准的Reactive扩展的某些组合来实现这一点。 我讨厌感觉我不必要地重新发明轮子,我试图建立的模式似乎是一个相当标准的模式。

这实际上是一种以偶数间隔推送缓冲事件的方法的副本,但我会在这里包含一个摘要(原始看起来很混乱,因为它看了几个选择)。

 public static IObservable Buffered(this IObservable source, TimeSpan minDelay) { return source.Drain(x => Observable.Empty() .Delay(minDelay) .StartWith(x) ); } 

我的Drain实现就像SelectMany一样,除了它等待先前的输出完成(你可以把它想象成ConactMany ,而SelectMany更像是MergeMany )。 内置Drain不会以这种方式工作,因此您需要包含以下实现:

 public static class ObservableDrainExtensions { public static IObservable Drain( this IObservable source, Func> selector) { return Observable.Defer(() => { BehaviorSubject queue = new BehaviorSubject(new Unit()); return source .Zip(queue, (v, q) => v) .SelectMany(v => selector(v) .Do(_ => { }, () => queue.OnNext(new Unit())) ); }); } } 
Interesting Posts