Rx:我如何立即响应,并限制后续请求

我想建立一个可以立即响应事件的Rx订阅,然后忽略在指定的“冷却”时段内发生的后续事件。

开箱即用的Throttle / Buffer方法只在超时过后响应,这不是我需要的。

下面是一些设置场景的代码,并使用Throttle(这不是我想要的解决方案):

class Program { static Stopwatch sw = new Stopwatch(); static void Main(string[] args) { var subject = new Subject(); var timeout = TimeSpan.FromMilliseconds(500); subject .Throttle(timeout) .Subscribe(DoStuff); var factory = new TaskFactory(); sw.Start(); factory.StartNew(() => { Console.WriteLine("Batch 1 (no delay)"); subject.OnNext(1); }); factory.StartNewDelayed(1000, () => { Console.WriteLine("Batch 2 (1s delay)"); subject.OnNext(2); }); factory.StartNewDelayed(1300, () => { Console.WriteLine("Batch 3 (1.3s delay)"); subject.OnNext(3); }); factory.StartNewDelayed(1600, () => { Console.WriteLine("Batch 4 (1.6s delay)"); subject.OnNext(4); }); Console.ReadKey(); sw.Stop(); } private static void DoStuff(int i) { Console.WriteLine("Handling {0} at {1}ms", i, sw.ElapsedMilliseconds); } } 

现在运行此输出的输出是:

批次1(无延迟)

在508ms处理1

批次2(1秒延迟)

批次3(1.3s延迟)

第4批(1.6秒延迟)

处理4在2114ms

请注意,批处理2未处理(这很好!)因为由于节流的性质,我们在请求之间等待500毫秒。 批处理3也没有处理(由于它接近批处理4,因此它从批处理2发生的时间超过500毫秒)。

我正在寻找的是更像这样的东西:

批次1(无延迟)

在~0ms处理1

批次2(1秒延迟)

处理2~1000s

批次3(1.3s延迟)

第4批(1.6秒延迟)

在约1600时处理4

请注意,在这种情况下不会处理批处理3(这很好!),因为它发生在批处理2的500毫秒内。

编辑

这是我使用的“StartNewDelayed”扩展方法的实现:

 /// Creates a Task that will complete after the specified delay. /// The TaskFactory. /// The delay after which the Task should transition to RanToCompletion. /// A Task that will be completed after the specified duration. public static Task StartNewDelayed( this TaskFactory factory, int millisecondsDelay) { return StartNewDelayed(factory, millisecondsDelay, CancellationToken.None); } /// Creates a Task that will complete after the specified delay. /// The TaskFactory. /// The delay after which the Task should transition to RanToCompletion. /// The cancellation token that can be used to cancel the timed task. /// A Task that will be completed after the specified duration and that's cancelable with the specified token. public static Task StartNewDelayed(this TaskFactory factory, int millisecondsDelay, CancellationToken cancellationToken) { // Validate arguments if (factory == null) throw new ArgumentNullException("factory"); if (millisecondsDelay < 0) throw new ArgumentOutOfRangeException("millisecondsDelay"); // Create the timed task var tcs = new TaskCompletionSource(factory.CreationOptions); var ctr = default(CancellationTokenRegistration); // Create the timer but don't start it yet. If we start it now, // it might fire before ctr has been set to the right registration. var timer = new Timer(self => { // Clean up both the cancellation token and the timer, and try to transition to completed ctr.Dispose(); ((Timer)self).Dispose(); tcs.TrySetResult(null); }); // Register with the cancellation token. if (cancellationToken.CanBeCanceled) { // When cancellation occurs, cancel the timer and try to transition to cancelled. // There could be a race, but it's benign. ctr = cancellationToken.Register(() => { timer.Dispose(); tcs.TrySetCanceled(); }); } if (millisecondsDelay > 0) { // Start the timer and hand back the task... timer.Change(millisecondsDelay, Timeout.Infinite); } else { // Just complete the task, and keep execution on the current thread. ctr.Dispose(); tcs.TrySetResult(null); timer.Dispose(); } return tcs.Task; } 

这是我的方法。 它与之前的其他类似,但它不会遭受过度热心的窗口生产问题。

所需的function与Observable.Throttle非常相似,但一旦到达就会发出符合条件的事件,而不是在节流或采样周期的持续时间内延迟。 在符合条件的事件后的给定持续时间内,后续事件将被禁止。

作为可测试的扩展方法给出:

 public static class ObservableExtensions { public static IObservable SampleFirst( this IObservable source, TimeSpan sampleDuration, IScheduler scheduler = null) { scheduler = scheduler ?? Scheduler.Default; return source.Publish(ps => ps.Window(() => ps.Delay(sampleDuration,scheduler)) .SelectMany(x => x.Take(1))); } } 

我们的想法是使用Window的重载,使用windowClosingSelector创建非重叠的窗口, windowClosingSelector使用由sampleDuration时移的源。 因此,每个窗口将:(a)由其中的第一个元素关闭,(b)保持打开直到允许新元素。 然后我们只需从每个窗口中选择第一个元素。

Rx 1.x版本

上面使用的Publish扩展方法在Rx 1.x中不可用。 这是一个替代方案:

 public static class ObservableExtensions { public static IObservable SampleFirst( this IObservable source, TimeSpan sampleDuration, IScheduler scheduler = null) { scheduler = scheduler ?? Scheduler.Default; var sourcePub = source.Publish().RefCount(); return sourcePub.Window(() => sourcePub.Delay(sampleDuration,scheduler)) .SelectMany(x => x.Take(1)); } } 

我经过大量试验和错误后发现的解决方案是使用以下内容替换受限制的订阅:

 subject .Window(() => { return Observable.Interval(timeout); }) .SelectMany(x => x.Take(1)) .Subscribe(i => DoStuff(i)); 

编辑结合保罗的清理工作。

真棒解决方案安德鲁! 我们可以更进一步,清理内部订阅:

 subject .Window(() => { return Observable.Interval(timeout); }) .SelectMany(x => x.Take(1)) .Subscribe(DoStuff); 

我发布的初始答案有一个缺陷:即Window方法与Observable.Interval一起使用来表示窗口的结尾时,会设置一个500ms窗口的无限系列。 我真正需要的是一个窗口,当第一个结果被泵入主体时开始,并在500ms后结束。

我的示例数据掩盖了这个问题,因为数据很好地分解为已经创建的窗口。 (即0-500ms,501-1000ms,1001-1500ms等)

请考虑这个时间:

 factory.StartNewDelayed(300,() => { Console.WriteLine("Batch 1 (300ms delay)"); subject.OnNext(1); }); factory.StartNewDelayed(700, () => { Console.WriteLine("Batch 2 (700ms delay)"); subject.OnNext(2); }); factory.StartNewDelayed(1300, () => { Console.WriteLine("Batch 3 (1.3s delay)"); subject.OnNext(3); }); factory.StartNewDelayed(1600, () => { Console.WriteLine("Batch 4 (1.6s delay)"); subject.OnNext(4); }); 

我得到的是:

批次1(300毫秒延迟)

在356ms处理1

批次2(700毫秒延迟)

在750毫秒处理2

批次3(1.3s延迟)

在1346ms处理3

第4批(1.6秒延迟)

在1644ms处理4

这是因为窗口从0ms,500ms,1000ms和1500ms开始,因此每个Subject.OnNext适合它自己的窗口。

我想要的是:

批次1(300毫秒延迟)

在~300ms处理1

批次2(700毫秒延迟)

批次3(1.3s延迟)

在~1300ms处理3

第4批(1.6秒延迟)

在与同事进行了大量的挣扎和一小时的敲击之后,我们使用纯Rx和单个局部变量找到了更好的解决方案:

 bool isCoolingDown = false; subject .Where(_ => !isCoolingDown) .Subscribe( i => { DoStuff(i); isCoolingDown = true; Observable .Interval(cooldownInterval) .Take(1) .Subscribe(_ => isCoolingDown = false); }); 

我们的假设是对订阅方法的调用是同步的。 如果不是,则可以引入简单的锁定。

我有另一个给你。 这个不使用Repeat()也不使用Interval()所以它可能是你所追求的:

 subject .Window(() => Observable.Timer(TimeSpan.FromMilliseconds(500))) .SelectMany(x => x.Take(1)); 

那么最明显的事情就是在这里使用Repeat()。 但是,据我所知,Repeat()可能会引入问题,因此通知会在流停止和我们再次订阅之间消失。 在实践中,这对我来说从来都不是问题。

 subject .Take(1) .Concat(Observable.Empty().Delay(TimeSpan.FromMilliseconds(500))) .Repeat(); 

请记住替换为源的实际类型。

更新:

更新了查询以使用Concat而不是Merge

在尝试使用.Window重新实现我自己的相同或类似问题的解决方案时,我偶然发现了这个问题。看看,它似乎和这个问题一样,并且非常优雅地解决了:

https://stackoverflow.com/a/3224723/58463

使用.Scan() ! 当我需要立即进行第一次击中(在一段时间之后)时,这就是我用于限制的,但延迟(和组/忽略)任何后续击中。 基本上像Throttle一样工作,但如果之前的onNext是> = interval before,则会立即触发,否则,请按照上次命中的确切interval计划。 当然,如果在“冷却”期间出现多次点击,则会忽略其他点击,就像Throttle所做的那样。 与您的用例的不同之处在于,如果您在0 ms和100 ms处收到事件,它们将被处理(0ms和500ms),这可能是您实际想要的(否则,累加器很容易适应忽略任何打击都比interval更接近前一个)。

 public static IObservable QuickThrottle(this IObservable src, TimeSpan interval, IScheduler scheduler) { return src .Scan(new ValueAndDueTime(), (prev, id) => AccumulateForQuickThrottle(prev, id, interval, scheduler)) .Where(vd => !vd.Ignore) .SelectMany(sc => Observable.Timer(sc.DueTime, scheduler).Select(_ => sc.Value)); } private static ValueAndDueTime AccumulateForQuickThrottle(ValueAndDueTime prev, T value, TimeSpan interval, IScheduler s) { var now = s.Now; // Ignore this completely if there is already a future item scheduled // but do keep the dueTime for accumulation! if (prev.DueTime > now) return new ValueAndDueTime { DueTime = prev.DueTime, Ignore = true }; // Schedule this item at at least interval from the previous var min = prev.DueTime + interval; var nextTime = (now < min) ? min : now; return new ValueAndDueTime { DueTime = nextTime, Value = value }; } private class ValueAndDueTime { public DateTimeOffset DueTime; public T Value; public bool Ignore; }