使用IObservable进行批处理

我的服务器端向我发送了批量消息。 批处理和频率中的消息是任意的。 有时我会以1分钟的间隔收到消息,有时候不会收到一小时的消息。 有时是1条消息,有时是10.我当前的实现使用Observable.Buffer(TimeSpan.FromSeconds(5))来分组消息并将消息发送给用户。

有没有一种方法可以配置Observable,如果两条消息之间有几秒钟的延迟,就可以将缓冲的消息发送给用户。

我所处的位置是避免每5秒钟不必要的计时器滴答声。 打开其他建议以优化批处理。

使用bufferClosingSelector工厂方法

decPL建议使用Buffer的重载接受bufferClosingSelector – 一个在新缓冲区打开时调用的工厂函数。 它产生一个流,其第一个OnNext()OnCompleted()信号刷新当前缓冲区。 decPL代码看起来像这样:

 observable.Buffer(() => observable.Throttle(TimeSpan.FromSeconds(5))) 

这在解决方案方面取得了相当大的进展,但它有几个问题:

  • 在活动期间,服务器不会在节流持续时间内一致地发布消息时发送消息。 这可能导致大量的,不经常发布的列表。
  • 订阅源有多个订阅; 如果感冒,可能会产生意想不到的副作用。 在每个缓冲区关闭后调用bufferClosingSelector工厂,因此如果源是冷的,它将从初始事件而不是最新事件中进行限制。

防止无限期节流

我们需要使用额外的机制来限制缓冲区长度并防止无限制的限制。 Buffer有一个允许您指定最大长度的重载,但遗憾的是您无法将其与结束选择器结合使用。

让我们调用所需的缓冲区长度限制n 。 回想一下,关闭选择器的第一个OnNext足以关闭缓冲区,所以我们需要做的就是将一个计数流Merge到一个计数流,该计数流在来自源的n个事件之后发送OnNext 。 我们可以使用.Take(n).LastAsync()来做到这一点; 采取前n个事件但忽略除最后一个之外的所有事件。 这是Rx中非常有用的模式。

使源“热”

为了解决bufferClosingSelector工厂重新订阅源的问题,我们需要在源上使用.Publish().RefCount()的通用模式为我们提供一个只向订阅者发送最新事件的流。 这也是一个非常有用的模式。

这是重新设计的代码,其中节流持续时间与计数器合并:

 var throttleDuration = TimeSpan.FromSeconds(5); var bufferSize = 3; // single subscription to source var sourcePub = source.Publish().RefCount(); var output = sourcePub.Buffer( () => sourcePub.Throttle(throttleDuration) .Merge(sourcePub.Take(bufferSize).LastAsync())); 

生产准备代码和测试

这是一个带有测试的生产就绪实现(使用nuget包rx-testing&nunit)。 请注意调度程序的参数化以支持测试。

 public static partial class ObservableExtensions { public static IObservable> BufferNearEvents( this IObservable source, TimeSpan maxInterval, int maxBufferSize, IScheduler scheduler) { if (scheduler == null) scheduler = ThreadPoolScheduler.Instance; if (maxBufferSize < = 0) throw new ArgumentOutOfRangeException( "maxBufferSize", "maxBufferSize must be positive"); var publishedSource = source.Publish().RefCount(); return publishedSource.Buffer( () => publishedSource .Throttle(maxInterval, scheduler) .Merge(publishedSource.Take(maxBufferSize).LastAsync())); } } public class BufferNearEventsTests : ReactiveTest { [Test] public void CloseEventsAreBuffered() { TimeSpan maxInterval = TimeSpan.FromTicks(200); const int maxBufferSize = 1000; var scheduler = new TestScheduler(); var source = scheduler.CreateColdObservable( OnNext(100, 1), OnNext(200, 2), OnNext(300, 3)); IList expectedBuffer = new [] {1, 2, 3}; var expectedTime = maxInterval.Ticks + 300; var results = scheduler.CreateObserver>(); source.BufferNearEvents(maxInterval, maxBufferSize, scheduler) .Subscribe(results); scheduler.AdvanceTo(1000); results.Messages.AssertEqual( OnNext>(expectedTime, buffer => CheckBuffer(expectedBuffer, buffer))); } [Test] public void FarEventsAreUnbuffered() { TimeSpan maxInterval = TimeSpan.FromTicks(200); const int maxBufferSize = 1000; var scheduler = new TestScheduler(); var source = scheduler.CreateColdObservable( OnNext(1000, 1), OnNext(2000, 2), OnNext(3000, 3)); IList[] expectedBuffers = { new[] {1}, new[] {2}, new[] {3} }; var expectedTimes = new[] { maxInterval.Ticks + 1000, maxInterval.Ticks + 2000, maxInterval.Ticks + 3000 }; var results = scheduler.CreateObserver>(); source.BufferNearEvents(maxInterval, maxBufferSize, scheduler) .Subscribe(results); scheduler.AdvanceTo(10000); results.Messages.AssertEqual( OnNext>(expectedTimes[0], buffer => CheckBuffer(expectedBuffers[0], buffer)), OnNext>(expectedTimes[1], buffer => CheckBuffer(expectedBuffers[1], buffer)), OnNext>(expectedTimes[2], buffer => CheckBuffer(expectedBuffers[2], buffer))); } [Test] public void UpToMaxEventsAreBuffered() { TimeSpan maxInterval = TimeSpan.FromTicks(200); const int maxBufferSize = 2; var scheduler = new TestScheduler(); var source = scheduler.CreateColdObservable( OnNext(100, 1), OnNext(200, 2), OnNext(300, 3)); IList[] expectedBuffers = { new[] {1,2}, new[] {3} }; var expectedTimes = new[] { 200, /* Buffer cap reached */ maxInterval.Ticks + 300 }; var results = scheduler.CreateObserver>(); source.BufferNearEvents(maxInterval, maxBufferSize, scheduler) .Subscribe(results); scheduler.AdvanceTo(10000); results.Messages.AssertEqual( OnNext>(expectedTimes[0], buffer => CheckBuffer(expectedBuffers[0], buffer)), OnNext>(expectedTimes[1], buffer => CheckBuffer(expectedBuffers[1], buffer))); } private static bool CheckBuffer(IEnumerable expected, IEnumerable actual) { CollectionAssert.AreEquivalent(expected, actual); return true; } } 

如果我理解你的描述正确, Observable.Buffer仍然是你的朋友,只是使用导致可观察事件的重载来指示何时应该发送缓冲项。 如下:

 observable.Buffer(() => observable.Throttle(TimeSpan.FromSeconds(5))) 

这是一个老问题,但它似乎与我最近的问题有关 。 Enigmativity找到了一个很好的方式去做我认为你想要达到的目标,所以我想我会分享。 我用扩展方法包装了解决方案:

 public static class ObservableExtensions { public static IObservable Batch(this IObservable observable, TimeSpan timespan) { return observable.GroupByUntil(x => 1, g => Observable.Timer(timespan)) .Select(x => x.ToArray()) .Switch(); } } 

它可以像这样使用:

 observableSource.Batch(TimeSpan.FromSeconds(5));