反应式扩展是否支持滚动缓冲?

我正在使用反应式扩展将数据整理到100ms的缓冲区中:

this.subscription = this.dataService .Where(x => !string.Equals("FOO", x.Key.Source)) .Buffer(TimeSpan.FromMilliseconds(100)) .ObserveOn(this.dispatcherService) .Where(x => x.Count != 0) .Subscribe(this.OnBufferReceived); 

这很好用。 但是,我想要的行为与Buffer操作提供的行为略有不同。 基本上,如果收到另一个数据项,我想重置计时器。 只有当整个100毫秒没有收到数据时我才能处理它。 这开启了永不处理数据的可能性,因此我还应该能够指定最大计数。 我会想象一下:

 .SlidingBuffer(TimeSpan.FromMilliseconds(100), 10000) 

我已经环顾四周,在Rx中找不到这样的东西? 任何人都可以确认/否认这个吗?

我写了一个扩展来完成你所经历的大部分事 – BufferWithInactivity

这里是:

 public static IObservable> BufferWithInactivity( this IObservable source, TimeSpan inactivity, int maximumBufferSize) { return Observable.Create>(o => { var gate = new object(); var buffer = new List(); var mutable = new SerialDisposable(); var subscription = (IDisposable)null; var scheduler = Scheduler.ThreadPool; Action dump = () => { var bts = buffer.ToArray(); buffer = new List(); if (o != null) { o.OnNext(bts); } }; Action dispose = () => { if (subscription != null) { subscription.Dispose(); } mutable.Dispose(); }; Action>>> onErrorOrCompleted = onAction => { lock (gate) { dispose(); dump(); if (o != null) { onAction(o); } } }; Action onError = ex => onErrorOrCompleted(x => x.OnError(ex)); Action onCompleted = () => onErrorOrCompleted(x => x.OnCompleted()); Action onNext = t => { lock (gate) { buffer.Add(t); if (buffer.Count == maximumBufferSize) { dump(); mutable.Disposable = Disposable.Empty; } else { mutable.Disposable = scheduler.Schedule(inactivity, () => { lock (gate) { dump(); } }); } } }; subscription = source .ObserveOn(scheduler) .Subscribe(onNext, onError, onCompleted); return () => { lock (gate) { o = null; dispose(); } }; }); } 

这可以通过组合Observable的内置WindowThrottle方法实现。 首先,让我们解决忽略最大计数条件的简单问题:

 public static IObservable> BufferUntilInactive(this IObservable stream, TimeSpan delay) { var closes = stream.Throttle(delay); return stream.Window(() => closes).SelectMany(window => window.ToList()); } 

强大的Window方法完成了繁重的工作。 现在很容易看到如何添加最大数量:

 public static IObservable> BufferUntilInactive(this IObservable stream, TimeSpan delay, Int32? max=null) { var closes = stream.Throttle(delay); if (max != null) { var overflows = stream.Where((x,index) => index+1>=max); closes = closes.Merge(overflows); } return stream.Window(() => closes).SelectMany(window => window.ToList()); } 

我会在我的博客上写一篇解释这个的post。 https://gist.github.com/2244036

Window方法的文档:

使用Rx Extensions 2.0,您可以使用接受超时和大小的新缓冲区重载来满足这两个要求:

 this.subscription = this.dataService .Where(x => !string.Equals("FOO", x.Key.Source)) .Buffer(TimeSpan.FromMilliseconds(100), 1) .ObserveOn(this.dispatcherService) .Where(x => x.Count != 0) .Subscribe(this.OnBufferReceived); 

有关文档,请参阅https://msdn.microsoft.com/en-us/library/hh229200(v=vs.103).aspx 。

我想这可以在Buffer方法之上实现,如下所示:

 public static IObservable> SlidingBuffer(this IObservable obs, TimeSpan span, int max) { return Observable.CreateWithDisposable>(cl => { var acc = new List(); return obs.Buffer(span) .Subscribe(next => { if (next.Count == 0) //no activity in time span { cl.OnNext(acc); acc.Clear(); } else { acc.AddRange(next); if (acc.Count >= max) //max items collected { cl.OnNext(acc); acc.Clear(); } } }, err => cl.OnError(err), () => { cl.OnNext(acc); cl.OnCompleted(); }); }); } 

注意:我没有测试过,但我希望它能给你这个想法。