一种以均匀间隔推送缓冲事件的方法
我想要实现的是缓冲来自某些IObservable的传入事件(它们以突发forms进行)并进一步释放它们,但是以偶数间隔逐个释放它们。 像这样:
-oo-ooo-oo------------------oooo-oo-o--------------> -o--o--o--o--o--o--o--------o--o--o--o--o--o--o---->
因为我对Rx很新,所以我不确定是否已经有一个主题或运算符。 也许它可以通过构图完成?
更新:
感谢Richard Szalay指出Drain运算符,我找到了James Miles的Drain运算符使用的另一个例子 。 以下是我设法让它在WPF应用程序中工作的方式:
.Drain(x => { Process(x); return Observable.Return(new Unit()) .Delay(TimeSpan.FromSeconds(1), Scheduler.Dispatcher ); }).Subscribe();
我有一些乐趣,因为省略scheduler参数会导致应用程序在调试模式下崩溃而不会出现任何exception(我需要学习如何处理Rx中的exception)。 Process方法直接修改了UI状态,但我想从中生成一个IObservable非常简单(使用ISubject?)。
更新:
与此同时,我一直在试验ISubject,下面的课程做了我想要的 – 它及时让出缓冲的Ts:
public class StepSubject : ISubject { IObserver subscriber; Queue queue = new Queue(); MutableDisposable cancel = new MutableDisposable(); TimeSpan interval; IScheduler scheduler; bool idle = true; public StepSubject(TimeSpan interval, IScheduler scheduler) { this.interval = interval; this.scheduler = scheduler; } void Step() { T next; lock (queue) { idle = queue.Count == 0; if (!idle) next = queue.Dequeue(); } if (!idle) { cancel.Disposable = scheduler.Schedule(Step, interval); subscriber.OnNext(next); } } public void OnNext(T value) { lock (queue) queue.Enqueue(value); if (idle) cancel.Disposable = scheduler.Schedule(Step); } public IDisposable Subscribe(IObserver observer) { subscriber = observer; return cancel; } }
为了清楚起见,这个天真的实现从OnCompleted和OnError中剥离,也只允许单个订阅。
它实际上比听起来更诡异。
使用Delay
不起作用,因为值仍将批量发生,只是稍微延迟。
使用Interval
与CombineLatest
或Zip
不起作用,因为前者将导致跳过源值,后者将缓冲间隔值。
我认为新的Drain
运算符( 在1.0.2787.0中添加 )与Delay
相结合应该可以解决这个问题:
source.Drain(x => Observable.Empty().Delay(TimeSpan.FromSeconds(1)).StartWith(x));
Drain
运算符的工作方式与SelectMany
类似,但在使用下一个值调用选择器之前一直等到上一个输出完成。 它仍然不是你所追求的(块中的第一个值也会被延迟),但它已经接近了:上面的用法现在与你的大理石图相匹配。
编辑:显然框架中的Drain
不像SelectMany
那样工作。 我会在官方论坛上寻求一些建议。 与此同时,这里是Drain的一个实现,它可以完成你所追求的目标:
编辑09/11:修复了实施中的错误和更新的使用情况,以匹配您请求的大理石图。
public static class ObservableDrainExtensions { public static IObservable Drain(this IObservable source, Func> selector) { return Observable.Defer(() => { BehaviorSubject queue = new BehaviorSubject (new Unit()); return source .Zip(queue, (v, q) => v) .SelectMany(v => selector(v) .Do(_ => { }, () => queue.OnNext(new Unit())) ); }); } }
为了完整起见,这里是理查德建议的Drain()方法的替代版(更紧凑):
public static IObservable SelectManySequential( this IObservable source, Func> selector ) { return source .Select(x => Observable.Defer (() => selector(x))) .Concat(); }
看线程Drain + SelectMany =? 在Rx论坛。
更新:我意识到我使用的Concat()重载是我个人Rx扩展之一(尚未)框架的一部分。 我很抱歉这个错误..当然这让我的解决方案不如我想象的那么优雅。
不过为了完整性我在这里发布了我的Conact()扩展方法重载:
public static IObservable Concat (this IObservable> source) { return Observable.CreateWithDisposable (o => { var lockCookie = new Object(); bool completed = false; bool subscribed = false; var waiting = new Queue>(); var pendingSubscription = new MutableDisposable(); Action errorHandler = e => { o.OnError(e); pendingSubscription.Dispose(); }; Func, IDisposable> subscribe = null; subscribe = (ob) => { subscribed = true; return ob.Subscribe( o.OnNext, errorHandler, () => { lock (lockCookie) { if (waiting.Count > 0) pendingSubscription.Disposable = subscribe(waiting.Dequeue()); else if (completed) o.OnCompleted(); else subscribed = false; } } ); }; return new CompositeDisposable(pendingSubscription, source.Subscribe( n => { lock (lockCookie) { if (!subscribed) pendingSubscription.Disposable = subscribe(n); else waiting.Enqueue(n); } }, errorHandler , () => { lock (lockCookie) { completed = true; if (!subscribed) o.OnCompleted(); } } ) ); }); }
现在用我自己的武器击败自己:同样的Concat()方法可以用Richard Szalay的精彩方式写得更优雅:
public static IObservable Concat (this IObservable> source) { return Observable.Defer(() => { BehaviorSubject queue = new BehaviorSubject (new Unit()); return source .Zip(queue, (v, q) => v) .SelectMany(v => v.Do(_ => { }, () => queue.OnNext(new Unit())) ); }); }
所以信用属于理查德。 🙂
这是我如何做到这一点,只是使用一个显式队列(ReactiveCollection只是WPF的ObservableCollection的一个奇特版本 – ReactiveCollection.ItemsAdded OnNext是为每个添加的项目,你可以想象):
https://github.com/xpaulbettsx/ReactiveXaml/blob/master/ReactiveXaml/ReactiveCollection.cs#L309
public static ReactiveCollection CreateCollection (this IObservable FromObservable, TimeSpan? WithDelay = null) { var ret = new ReactiveCollection (); if (WithDelay == null) { FromObservable.ObserveOn(RxApp.DeferredScheduler).Subscribe(ret.Add); return ret; } // On a timer, dequeue items from queue if they are available var queue = new Queue (); var disconnect = Observable.Timer(WithDelay.Value, WithDelay.Value) .ObserveOn(RxApp.DeferredScheduler).Subscribe(_ => { if (queue.Count > 0) { ret.Add(queue.Dequeue()); } }); // When new items come in from the observable, stuff them in the queue. // Using the DeferredScheduler guarantees we'll always access the queue // from the same thread. FromObservable.ObserveOn(RxApp.DeferredScheduler).Subscribe(queue.Enqueue); // This is a bit clever - keep a running count of the items actually // added and compare them to the final count of items provided by the // Observable. Combine the two values, and when they're equal, // disconnect the timer ret.ItemsAdded.Scan0(0, ((acc, _) => acc+1)).Zip(FromObservable.Aggregate(0, (acc,_) => acc+1), (l,r) => (l == r)).Where(x => x != false).Subscribe(_ => disconnect.Dispose()); return ret; }