一种以均匀间隔推送缓冲事件的方法

我想要实现的是缓冲来自某些IObservable的传入事件(它们以突发forms进行)并进一步释放它们,但是以偶数间隔逐个释放它们。 像这样:

-oo-ooo-oo------------------oooo-oo-o--------------> -o--o--o--o--o--o--o--------o--o--o--o--o--o--o----> 

因为我对Rx很新,所以我不确定是否已经有一个主题或运算符。 也许它可以通过构图完成?

更新:

感谢Richard Szalay指出Drain运算符,我找到了James Miles的Drain运算符使用的另一个例子 。 以下是我设法让它在WPF应用程序中工作的方式:

  .Drain(x => { Process(x); return Observable.Return(new Unit()) .Delay(TimeSpan.FromSeconds(1), Scheduler.Dispatcher ); }).Subscribe(); 

我有一些乐趣,因为省略scheduler参数会导致应用程序在调试模式下崩溃而不会出现任何exception(我需要学习如何处理Rx中的exception)。 Process方法直接修改了UI状态,但我想从中生成一个IObservable非常简单(使用ISubject?)。

更新:

与此同时,我一直在试验ISubject,下面的课程做了我想要的 – 它及时让出缓冲的Ts:

 public class StepSubject : ISubject { IObserver subscriber; Queue queue = new Queue(); MutableDisposable cancel = new MutableDisposable(); TimeSpan interval; IScheduler scheduler; bool idle = true; public StepSubject(TimeSpan interval, IScheduler scheduler) { this.interval = interval; this.scheduler = scheduler; } void Step() { T next; lock (queue) { idle = queue.Count == 0; if (!idle) next = queue.Dequeue(); } if (!idle) { cancel.Disposable = scheduler.Schedule(Step, interval); subscriber.OnNext(next); } } public void OnNext(T value) { lock (queue) queue.Enqueue(value); if (idle) cancel.Disposable = scheduler.Schedule(Step); } public IDisposable Subscribe(IObserver observer) { subscriber = observer; return cancel; } } 

为了清楚起见,这个天真的实现从OnCompleted和OnError中剥离,也只允许单个订阅。

它实际上比听起来更诡异。

使用Delay不起作用,因为值仍将批量发生,只是稍微延迟。

使用IntervalCombineLatestZip不起作用,因为前者将导致跳过源值,后者将缓冲间隔值。

我认为新的Drain运算符( 在1.0.2787.0中添加 )与Delay相结合应该可以解决这个问题:

 source.Drain(x => Observable.Empty().Delay(TimeSpan.FromSeconds(1)).StartWith(x)); 

Drain运算符的工作方式与SelectMany类似,但在使用下一个值调用选择器之前一直等到上一个输出完成。 它仍然不是你所追求的(块中的第一个值也会被延迟),但它已经接近了:上面的用法现在与你的大理石图相匹配。

编辑:显然框架中的Drain不像SelectMany那样工作。 我会在官方论坛上寻求一些建议。 与此同时,这里是Drain的一个实现,它可以完成你所追求的目标:

编辑09/11:修复了实施中的错误和更新的使用情况,以匹配您请求的大理石图。

 public static class ObservableDrainExtensions { public static IObservable Drain(this IObservable source, Func> selector) { return Observable.Defer(() => { BehaviorSubject queue = new BehaviorSubject(new Unit()); return source .Zip(queue, (v, q) => v) .SelectMany(v => selector(v) .Do(_ => { }, () => queue.OnNext(new Unit())) ); }); } } 

为了完整起见,这里是理查德建议的Drain()方法的替代版(更紧凑):

 public static IObservable SelectManySequential( this IObservable source, Func> selector ) { return source .Select(x => Observable.Defer(() => selector(x))) .Concat(); } 

看线程Drain + SelectMany =? 在Rx论坛。

更新:我意识到我使用的Concat()重载是我个人Rx扩展之一(尚未)框架的一部分。 我很抱歉这个错误..当然这让我的解决方案不如我想象的那么优雅。

不过为了完整性我在这里发布了我的Conact()扩展方法重载:

 public static IObservable Concat(this IObservable> source) { return Observable.CreateWithDisposable(o => { var lockCookie = new Object(); bool completed = false; bool subscribed = false; var waiting = new Queue>(); var pendingSubscription = new MutableDisposable(); Action errorHandler = e => { o.OnError(e); pendingSubscription.Dispose(); }; Func, IDisposable> subscribe = null; subscribe = (ob) => { subscribed = true; return ob.Subscribe( o.OnNext, errorHandler, () => { lock (lockCookie) { if (waiting.Count > 0) pendingSubscription.Disposable = subscribe(waiting.Dequeue()); else if (completed) o.OnCompleted(); else subscribed = false; } } ); }; return new CompositeDisposable(pendingSubscription, source.Subscribe( n => { lock (lockCookie) { if (!subscribed) pendingSubscription.Disposable = subscribe(n); else waiting.Enqueue(n); } }, errorHandler , () => { lock (lockCookie) { completed = true; if (!subscribed) o.OnCompleted(); } } ) ); }); } 

现在用我自己的武器击败自己:同样的Concat()方法可以用Richard Szalay的精彩方式写得更优雅:

 public static IObservable Concat(this IObservable> source) { return Observable.Defer(() => { BehaviorSubject queue = new BehaviorSubject(new Unit()); return source .Zip(queue, (v, q) => v) .SelectMany(v => v.Do(_ => { }, () => queue.OnNext(new Unit())) ); }); } 

所以信用属于理查德。 🙂

这是我如何做到这一点,只是使用一个显式队列(ReactiveCollection只是WPF的ObservableCollection的一个奇特版本 – ReactiveCollection.ItemsAdded OnNext是为每个添加的项目,你可以想象):

https://github.com/xpaulbettsx/ReactiveXaml/blob/master/ReactiveXaml/ReactiveCollection.cs#L309

 public static ReactiveCollection CreateCollection(this IObservable FromObservable, TimeSpan? WithDelay = null) { var ret = new ReactiveCollection(); if (WithDelay == null) { FromObservable.ObserveOn(RxApp.DeferredScheduler).Subscribe(ret.Add); return ret; } // On a timer, dequeue items from queue if they are available var queue = new Queue(); var disconnect = Observable.Timer(WithDelay.Value, WithDelay.Value) .ObserveOn(RxApp.DeferredScheduler).Subscribe(_ => { if (queue.Count > 0) { ret.Add(queue.Dequeue()); } }); // When new items come in from the observable, stuff them in the queue. // Using the DeferredScheduler guarantees we'll always access the queue // from the same thread. FromObservable.ObserveOn(RxApp.DeferredScheduler).Subscribe(queue.Enqueue); // This is a bit clever - keep a running count of the items actually // added and compare them to the final count of items provided by the // Observable. Combine the two values, and when they're equal, // disconnect the timer ret.ItemsAdded.Scan0(0, ((acc, _) => acc+1)).Zip(FromObservable.Aggregate(0, (acc,_) => acc+1), (l,r) => (l == r)).Where(x => x != false).Subscribe(_ => disconnect.Dispose()); return ret; }