自定义Rx运算符仅在存在最近值时才进行限制

我正在尝试创建一个看起来非常有用的Rx运算符,但令我惊讶的是没有发现任何与Stackoverflow完全匹配的问题。 我想在Throttle上创建一个变体,如果有一段时间不活动,可以立即通过值。 我想象的用例是这样的:

我有一个下拉列表,当值发生变化时,它会启动Web请求。 如果用户按住箭头键并快速循环显示值,我不想启动每个值的请求。 但是如果我限制流,那么每次用户只需从正常方式从下拉列表中选择一个值时,用户就必须等待节流持续时间。

因此,普通的Throttle看起来像这样: 正常油门():

我想创建如下所示的ThrottleSubsequentThrottleSubsequent():

请注意,大理石1,2和6会毫不拖延地通过,因为它们各自都处于不活动状态。

我对此的尝试如下所示:

 public static IObservable ThrottleSubsequent(this IObservable source, TimeSpan dueTime, IScheduler scheduler) { // Create a timer that resets with each new source value var cooldownTimer = source .Select(x => Observable.Interval(dueTime, scheduler)) // Each source value becomes a new timer .Switch(); // Switch to the most recent timer var cooldownWindow = source.Window(() => cooldownTimer); // Pass along the first value of each cooldown window immediately var firstAfterCooldown = cooldownWindow.SelectMany(o => o.Take(1)); // Throttle the rest of the values var throttledRest = cooldownWindow .SelectMany(o => o.Skip(1)) .Throttle(dueTime, scheduler); return Observable.Merge(firstAfterCooldown, throttledRest); } 

似乎有效,但我有一个困难的时间推理这个,我觉得这里有一些边缘情况,事情可能会因重复值或其他事情而变得棘手。 我想从更有经验的Rx-errs那里得到一些关于这段代码是否正确的反馈,和/或是否有更惯用的方式来做到这一点。

好吧,这是一个测试套件(使用nuget Microsoft.Reactive.Testing ):

 var ts = new TestScheduler(); var source = ts.CreateHotObservable( new Recorded>(200.MsTicks(), Notification.CreateOnNext('A')), new Recorded>(300.MsTicks(), Notification.CreateOnNext('B')), new Recorded>(500.MsTicks(), Notification.CreateOnNext('C')), new Recorded>(510.MsTicks(), Notification.CreateOnNext('D')), new Recorded>(550.MsTicks(), Notification.CreateOnNext('E')), new Recorded>(610.MsTicks(), Notification.CreateOnNext('F')), new Recorded>(760.MsTicks(), Notification.CreateOnNext('G')) ); var target = source.ThrottleSubsequent(TimeSpan.FromMilliseconds(150), ts); var expectedResults = ts.CreateHotObservable( new Recorded>(200.MsTicks(), Notification.CreateOnNext('A')), new Recorded>(450.MsTicks(), Notification.CreateOnNext('B')), new Recorded>(500.MsTicks(), Notification.CreateOnNext('C')), new Recorded>(910.MsTicks(), Notification.CreateOnNext('G')) ); var observer = ts.CreateObserver(); target.Subscribe(observer); ts.Start(); ReactiveAssert.AreElementsEqual(expectedResults.Messages, observer.Messages); 

和使用

 public static class TestingHelpers { public static long MsTicks(this int i) { return TimeSpan.FromMilliseconds(i).Ticks; } } 

似乎过去了。 如果你想减少它,你可以把它变成这个:

 public static IObservable ThrottleSubsequent2(this IObservable source, TimeSpan dueTime, IScheduler scheduler) { return source.Publish(_source => _source .Window(() => _source .Select(x => Observable.Interval(dueTime, scheduler)) .Switch() )) .Publish(cooldownWindow => Observable.Merge( cooldownWindow .SelectMany(o => o.Take(1)), cooldownWindow .SelectMany(o => o.Skip(1)) .Throttle(dueTime, scheduler) ) ); } 

编辑

Publish强制共享订阅。 如果您有一个带有订阅副作用的不良(或昂贵)源观察, Publish确保您只订阅一次。 以下是Publish帮助的示例:

 void Main() { var source = UglyRange(10); var target = source .SelectMany(i => Observable.Return(i).Delay(TimeSpan.FromMilliseconds(10 * i))) .ThrottleSubsequent2(TimeSpan.FromMilliseconds(70), Scheduler.Default) //Works with ThrottleSubsequent2, fails with ThrottleSubsequent .Subscribe(i => Console.WriteLine(i)); } static int counter = 0; public IObservable UglyRange(int limit) { var uglySource = Observable.Create(o => { if (counter++ == 0) { Console.WriteLine("Ugly observable should only be created once."); Enumerable.Range(1, limit).ToList().ForEach(i => o.OnNext(i)); } else { Console.WriteLine($"Ugly observable should only be created once. This is the {counter}th time created."); o.OnError(new Exception($"observable invoked {counter} times.")); } return Disposable.Empty; }); return uglySource; }