以固定或最小间隔处理Rx事件

我有一系列事件,每10-1000毫秒发生一次。 我订阅了这个事件来源,但希望以500ms的固定(或最小)间隔处理它们。 我也想一次处理一个事件,而不是批量处理(比如Buffer(x> 1))。

伪代码中有类似的东西:

observable.MinimumInterval(TimeSpan.FromMiliseconds(500)).Subscribe(v=>...); 

试过例如:

 observable.Buffer(1).Delay(TimeSpan.FromMiliseconds(500).Subscribe(v=>...); 

以及许多其他潜在的解决方案。 到目前为止没有运气。

有任何想法吗?

我在这里的博客上回答了这个问题。

通过添加呈现作为扩展方法来再现(在链接腐烂的情况下!):

将Rx中的事件流约束到最大速率

有时,您希望限制事件从Rx流到达的速率。

如果另一个事件在指定的时间间隔内到达,则Throttle操作符将抑制该事件。 这在许多情况下非常有用,但它确实有两个重要的副作用 – 即使未压缩的事件也会被间隔延迟,如果事件太快到达,事件将完全丢失。

我遇到了这两种情况都不可接受的情况。 在这种特殊情况下,所需的行为如下:事件应以TimeSpan指定的最大速率输出,否则应尽快输出。

一种解决方案就是这样。 想象一下,我们的输入流是一群人到达火车站。 对于我们的输出,我们希望人们以最高速度离开车站。 我们通过让每个人站在平板铁路卡车的前面并以固定速度将该卡车送出车站来设定最大费率。 因为只有一条轨道,并且所有卡车以相同的速度行驶并且具有相同的长度,所以当卡车背对背地离开时,人们将以最大速率离开车站。 但是,如果赛道清晰,下一个人将能够立即离开。

那么我们如何将这个比喻翻译成Rx呢?

我们将使用Concat运营商接受流的流并将它们背靠背地合并在一起 – 就像在铁轨上发送铁路卡车一样。

为了将每个人的等价物放到铁路卡车上,我们将使用选择将每个事件(人)投射到以单个OnNext事件(人员)开头的可观察序列(铁路卡车)并以完全结束的OnComplete结束稍后定义的间隔。

让我们假设输入事件是变量输入中的IObservable。 这是代码:

 var paced = input.Select(i => Observable.Empty() .Delay(interval) .StartWith(i)).Concat(); 

作为一种扩展方法,这将成为:

 public static IObservable Pace(this IObservable source, TimeSpan interval) { return source.Select(i => Observable.Empty() .Delay(interval) .StartWith(i)).Concat(); } 

既然你想要保留所有事件,我认为你已经使用Buffer 。 但你应该用TimeSpan调用它…

 observable.Buffer(TimeSpan.FromMiliseconds(500)).Subscribe(v=>...); 

…其中v是可以循环的IList

只要有1个事件,你的Buffer(1)原始调用就会触发,就像它根本不存在一样。 使用时间窗口缓冲将收集在该间隔内触发的所有事件,并在每个间隔结束时将其提供给您。

这是我的尝试:

  public static IObservable MinimumInterval(this IObservable source, TimeSpan rate, IScheduler scheduler = null) { if (scheduler == null) scheduler = TaskPoolScheduler.Default; Func, IDisposable> subscribe = obs => { var nextTick = scheduler.Now; var subscriptions = new CompositeDisposable(); Action onNext = value => { var sendTime = Max(nextTick, scheduler.Now); var disp = new SingleAssignmentDisposable(); disp.Disposable = scheduler.Schedule(sendTime, () => { subscriptions.Remove(disp); obs.OnNext(value); }); subscriptions.Add(disp); nextTick = sendTime + rate; }; Action onError = err => { subscriptions.Dispose(); obs.OnError(err); }; Action onCompleted = () => { subscriptions.Dispose(); obs.OnCompleted(); }; var listener = Observer.Create(onNext, onError, onCompleted); subscriptions.Add(source.Subscribe(listener)); return subscriptions; }; return Observable.Create(subscribe); } 

它跟踪最早发送下一条消息的时间,并使用调度程序在事件发生得太快时延迟事件。 CompositeDisposable确保在侦听器取消订阅时取消计划事件。

欢迎提供建设性的反馈。

有一个Throttle扩展方法应该是你想要实现的。

试试这个

 var interval = Observable.Timer(TimeSpan.FromMilliseconds(500)).IgnoreElements(); var observable2 = observable .Select(e => Observable.Return(e).Concat(interval)) .Concat(); observable2.Subscribe(e => { // will have a minimum interval of 500ms between calls });