如何使用RX限制事件流?
我想有效地限制事件流,以便在收到第一个事件时调用我的委托,但如果接收到后续事件则不会持续1秒。 在超时(1秒)到期后,如果收到后续事件,我希望调用我的代理。
使用Reactive Extensions有一种简单的方法吗?
示例代码:
static void Main(string[] args) { Console.WriteLine("Running..."); var generator = Observable .GenerateWithTime(1, x => x x, x => TimeSpan.FromMilliseconds(1), x => x + 1) .Timestamp(); var builder = new StringBuilder(); generator .Sample(TimeSpan.FromSeconds(1)) .Finally(() => Console.WriteLine(builder.ToString())) .Subscribe(feed => builder.AppendLine(string.Format("Observed {0:000}, generated at {1}, observed at {2}", feed.Value, feed.Timestamp.ToString("mm:ss.fff"), DateTime.Now.ToString("mm:ss.fff")))); Console.ReadKey(); }
当前输出:
Running... Observed 064, generated at 41:43.602, observed at 41:43.602 Observed 100, generated at 41:44.165, observed at 41:44.602
但我想观察(时间戳显然会改变)
Running... Observed 001, generated at 41:43.602, observed at 41:43.602 .... Observed 100, generated at 41:44.165, observed at 41:44.602
这是我在RX论坛的帮助下得到的:
我们的想法是为原始序列发布一系列“门票”。 这些“门票”在超时时被延迟,不包括第一个,它立即预先加到票证序列。 当一个事件进来并且有一个等待的票时,事件立即触发,否则它等到票然后开火。 当它发射时,发出下一张票,依此类推……
要结合票证和原始事件,我们需要一个组合器。 不幸的是,“标准”.CombineLatest不能在这里使用,因为它会触发之前使用过的门票和事件。 因此,我必须创建自己的组合器,它基本上是一个过滤的.CombineLatest,仅当组合中的两个元素都是“新鲜”时才会触发 – 之前从未返回过。 我称之为.CombineVeryLatest aka .BrokenZip;)
使用.CombineVeryLatest,上面的想法可以这样实现:
public static IObservable SampleResponsive ( this IObservable source, TimeSpan delay) { return source.Publish(src => { var fire = new Subject (); var whenCanFire = fire .Select(u => new Unit()) .Delay(delay) .StartWith(new Unit()); var subscription = src .CombineVeryLatest(whenCanFire, (x, flag) => x) .Subscribe(fire); return fire.Finally(subscription.Dispose); }); } public static IObservable CombineVeryLatest (this IObservable leftSource, IObservable rightSource, Func selector) { var ls = leftSource.Select(x => new Used(x)); var rs = rightSource.Select(x => new Used(x)); var cmb = ls.CombineLatest(rs, (x, y) => new { x, y }); var fltCmb = cmb .Where(a => !(axIsUsed || ayIsUsed)) .Do(a => { axIsUsed = true; ayIsUsed = true; }); return fltCmb.Select(a => selector(axValue, ayValue)); } private class Used { internal T Value { get; private set; } internal bool IsUsed { get; set; } internal Used(T value) { Value = value; } }
编辑:这是AndreasKöpf在论坛上提出的另一个更紧凑的CombineVeryLatest变体:
public static IObservable CombineVeryLatest (this IObservable leftSource, IObservable rightSource, Func selector) { return Observable.Defer(() => { int l = -1, r = -1; return Observable.CombineLatest( leftSource.Select(Tuple.Create), rightSource.Select(Tuple.Create), (x, y) => new { x, y }) .Where(t => txItem2 != l && tyItem2 != r) .Do(t => { l = txItem2; r = tyItem2; }) .Select(t => selector(txItem1, tyItem1)); }); }
好的,
你有3个场景:
1)我希望每秒获得一个事件流的值。 意味着:如果它每秒产生更多事件,你将得到一个总是更大的缓冲区。
observableStream.Throttle(timeSpan)
2)我想得到最新的事件,这是在第二次发生之前产生的意思:其他事件被丢弃。
observableStream.Sample(TimeSpan.FromSeconds(1))
3)你想得到所有事件,发生在最后一秒。 那一秒钟
observableStream.BufferWithTime(timeSpan)
4)你想要选择在第二个与所有值之间发生的事情,直到第二个已经过去,并返回你的结果
observableStream.CombineLatest(Observable.Interval(1000), selectorOnEachEvent)
我昨晚在同样的问题上苦苦挣扎,相信我找到了一个更优雅(或至少更短)的解决方案:
var delay = Observable.Empty().Delay(TimeSpan.FromSeconds(1)); var throttledSource = source.Take(1).Concat(delay).Repeat();
这是我在Rx论坛中发布的这个问题的答案:
更新 :这是一个新版本,当事件发生的时间差超过一秒时,不再延迟事件转发:
public static IObservable ThrottleResponsive3 (this IObservable source, TimeSpan minInterval) { return Observable.CreateWithDisposable (o => { object gate = new Object(); Notification last = null, lastNonTerminal = null; DateTime referenceTime = DateTime.UtcNow - minInterval; var delayedReplay = new MutableDisposable(); return new CompositeDisposable(source.Materialize().Subscribe(x => { lock (gate) { var elapsed = DateTime.UtcNow - referenceTime; if (elapsed >= minInterval && delayedReplay.Disposable == null) { referenceTime = DateTime.UtcNow; x.Accept(o); } else { if (x.Kind == NotificationKind.OnNext) lastNonTerminal = x; last = x; if (delayedReplay.Disposable == null) { delayedReplay.Disposable = Scheduler.ThreadPool.Schedule(() => { lock (gate) { referenceTime = DateTime.UtcNow; if (lastNonTerminal != null && lastNonTerminal != last) lastNonTerminal.Accept(o); last.Accept(o); last = lastNonTerminal = null; delayedReplay.Disposable = null; } }, minInterval - elapsed); } } } }), delayedReplay); }); }
这是我之前的尝试:
var source = Observable.GenerateWithTime(1, x => x <= 100, x => x, x => TimeSpan.FromMilliseconds(1), x => x + 1) .Timestamp(); source.Publish(o => o.Take(1).Merge(o.Skip(1).Sample(TimeSpan.FromSeconds(1))) ).Run(x => Console.WriteLine(x));
好的,这是一个解决方案。 我不喜欢它,特别是,但是……哦,好吧。
帽子向Jon提示我指向SkipWhile,向cRichter指向BufferWithTime。 多谢你们。
static void Main(string[] args) { Console.WriteLine("Running..."); var generator = Observable .GenerateWithTime(1, x => x <= 100, x => x, x => TimeSpan.FromMilliseconds(1), x => x + 1) .Timestamp(); var bufferedAtOneSec = generator.BufferWithTime(TimeSpan.FromSeconds(1)); var action = new Action>( feed => Console.WriteLine("Observed {0:000}, generated at {1}, observed at {2}", feed.Value, feed.Timestamp.ToString("mm:ss.fff"), DateTime.Now.ToString("mm:ss.fff"))); var reactImmediately = true; bufferedAtOneSec.Subscribe(list => { if (list.Count == 0) { reactImmediately = true; } else { action(list.Last()); } }); generator .SkipWhile(item => reactImmediately == false) .Subscribe(feed => { if(reactImmediately) { reactImmediately = false; action(feed); } }); Console.ReadKey(); }
你试过Throttle
扩展方法吗?
来自文档:
忽略来自可观察序列的值,然后在dueTime之前跟随另一个值
我不太清楚这是否会做你想做的事情 – 因为你想忽略下面的值而不是第一个值…但我希望它是你想要的。 试试看 :)
编辑:嗯……不,毕竟我不认为Throttle
是正确的。 我相信我明白了你想做什么,但我在框架中看不到任何东西。 我可能错过了一些东西。 你有没有问过Rx论坛? 很可能如果现在不存在,他们很乐意添加它:)
我怀疑你可以用SkipUntil
和SelectMany
以某种方式巧妙地做到这一点……但我认为应该采用自己的方法。
您正在寻找的是CombineLatest。
public static IObservable CombineLatest( IObservable leftSource, IObservable rightSource, Func selector )
当选择器(时间)有值时,它合并2个obeservable并返回所有值。
编辑:约翰是对的,这可能不是首选的解决方案
受Bluelings答案的启发我在这里提供了一个与Reactive Extensions 2.2.5一起编译的版本。
此特定版本计算样本数,并提供最后一个采样值。 为此,使用以下类:
class Sample { public Sample(T lastValue, Int32 count) { LastValue = lastValue; Count = count; } public T LastValue { get; private set; } public Int32 Count { get; private set; } }
这是运营商:
public static IObservable> SampleResponsive(this IObservable source, TimeSpan interval, IScheduler scheduler = null) { if (source == null) throw new ArgumentNullException(nameof(source)); return Observable.Create>( observer => { var gate = new Object(); var lastSampleValue = default(T); var lastSampleTime = default(DateTime); var sampleCount = 0; var scheduledTask = new SerialDisposable(); return new CompositeDisposable( source.Subscribe( value => { lock (gate) { var now = DateTime.UtcNow; var elapsed = now - lastSampleTime; if (elapsed >= interval) { observer.OnNext(new Sample (value, 1)); lastSampleValue = value; lastSampleTime = now; sampleCount = 0; } else { if (scheduledTask.Disposable == null) { scheduledTask.Disposable = (scheduler ?? Scheduler.Default).Schedule( interval - elapsed, () => { lock (gate) { if (sampleCount > 0) { lastSampleTime = DateTime.UtcNow; observer.OnNext(new Sample (lastSampleValue, sampleCount)); sampleCount = 0; } scheduledTask.Disposable = null; } } ); } lastSampleValue = value; sampleCount += 1; } } }, error => { if (sampleCount > 0) observer.OnNext(new Sample (lastSampleValue, sampleCount)); observer.OnError(error); }, () => { if (sampleCount > 0) observer.OnNext(new Sample (lastSampleValue, sampleCount)); observer.OnCompleted(); } ), scheduledTask ); } ); }