如何使用Reactive Extensions使用最大窗口大小来限制事件?

场景

我正在构建一个UI应用程序,每隔几毫秒从后端服务获取通知。 收到新通知后,我想尽快更新用户界面。

因为我可以在很短的时间内收到大量的通知,而且我总是只关心最新的事件,所以我使用了Reactive Extensions框架的Throttle()方法。 这允许我忽略紧跟新通知的通知事件,因此我的UI保持响应。

问题:

假设我将通知事件的事件流限制为50ms,并且后端每10ms发送一次通知,Thottle()方法将永远不会返回事件,因为它会一次又一次地重置其滑动窗口。 在这里,我需要一些额外的行为来指定像超时这样的东西,这样我就可以每秒至少检索一个事件,如果有这么高的事件吞吐量。 如何使用Reactive Extensions执行此操作?

正如詹姆斯所说, Observable.Sample将为您提供最新的价值。 但是,它会在计时器上执行,而不是根据油门中的第一个事件发生的时间。 然而,更重要的是,如果您的采样时间很长(例如十秒),并且您的事件在采样后立即触发,那么您将不会在接下来的十秒内获得该新事件。

如果你需要更紧凑的东西,你需要实现自己的function。 我冒昧地这样做了。 这段代码绝对可以使用一些清理,但我相信它可以满足您的要求。

 public static class ObservableEx { public static IObservable ThrottleMax(this IObservable source, TimeSpan dueTime, TimeSpan maxTime) { return source.ThrottleMax(dueTime, maxTime, Scheduler.Default); } public static IObservable ThrottleMax(this IObservable source, TimeSpan dueTime, TimeSpan maxTime, IScheduler scheduler) { return Observable.Create(o => { var hasValue = false; T value = default(T); var maxTimeDisposable = new SerialDisposable(); var dueTimeDisposable = new SerialDisposable(); Action action = () => { if (hasValue) { maxTimeDisposable.Disposable = Disposable.Empty; dueTimeDisposable.Disposable = Disposable.Empty; o.OnNext(value); hasValue = false; } }; return source.Subscribe( x => { if (!hasValue) { maxTimeDisposable.Disposable = scheduler.Schedule(maxTime, action); } hasValue = true; value = x; dueTimeDisposable.Disposable = scheduler.Schedule(dueTime, action); }, o.OnError, o.OnCompleted ); }); } } 

还有一些测试……

 [TestClass] public class ThrottleMaxTests : ReactiveTest { [TestMethod] public void CanThrottle() { var scheduler = new TestScheduler(); var results = scheduler.CreateObserver(); var source = scheduler.CreateColdObservable( OnNext(100, 1) ); var dueTime = TimeSpan.FromTicks(100); var maxTime = TimeSpan.FromTicks(250); source.ThrottleMax(dueTime, maxTime, scheduler) .Subscribe(results); scheduler.AdvanceTo(1000); results.Messages.AssertEqual( OnNext(200, 1) ); } [TestMethod] public void CanThrottleWithMaximumInterval() { var scheduler = new TestScheduler(); var results = scheduler.CreateObserver(); var source = scheduler.CreateColdObservable( OnNext(100, 1), OnNext(175, 2), OnNext(250, 3), OnNext(325, 4), OnNext(400, 5) ); var dueTime = TimeSpan.FromTicks(100); var maxTime = TimeSpan.FromTicks(250); source.ThrottleMax(dueTime, maxTime, scheduler) .Subscribe(results); scheduler.AdvanceTo(1000); results.Messages.AssertEqual( OnNext(350, 4), OnNext(500, 5) ); } [TestMethod] public void CanThrottleWithoutMaximumIntervalInterferance() { var scheduler = new TestScheduler(); var results = scheduler.CreateObserver(); var source = scheduler.CreateColdObservable( OnNext(100, 1), OnNext(325, 2) ); var dueTime = TimeSpan.FromTicks(100); var maxTime = TimeSpan.FromTicks(250); source.ThrottleMax(dueTime, maxTime, scheduler) .Subscribe(results); scheduler.AdvanceTo(1000); results.Messages.AssertEqual( OnNext(200, 1), OnNext(425, 2) ); } } 

不要使用Observable.Throttle ,使用像这样的Observable.Sample ,其中TimeSpan给出了更新之间所需的最小间隔:

 source.Sample(TimeSpan.FromMilliseconds(50))