学习Rx:如何在.Window的输出上使用.Scan来获得可观察的bool值序列

我有一系列真假值,如此

var alternatingTrueFalse = Observable.Generate( true, _ => true, x => !x, x => x, _ => TimeSpan.FromMilliseconds(new Random().Next(2000))) .Take(20).Publish(); alternatingTrueFalse.Connect(); var buffered = alternatingTrueFalse .Buffer(TimeSpan.FromMilliseconds(500)) .Subscribe(x => Console.WriteLine($"{TimeStamp} {string.Join(",", x)}")); 

我想用500毫秒(最大)窗口/缓冲区来查看序列。 如果在一个这样的窗口中只有一个真值(没有别的),我想翻转一个开关(只需调用一个命令,现在打印到控制台)。 然后,当下一个假值到达时,我想要向后翻转开关并关闭原始序列的当前窗口/缓冲区并开始一个新的。

使用Buffer + Scan翻转开关

到目前为止,我已经想出了一种在Buffer上做到这一点的方法。 但是,缓冲区打开时间过长,始终为500毫秒。

  var buffered = alternatingTrueFalse .Buffer(TimeSpan.FromMilliseconds(500)); var output = buffered .Subscribe(x => Console.WriteLine($"{TimeStamp} {string.Join(",", x)}")); var isFlipped = buffered.Scan(false, (x, y) => { if (y.Count == 0) { return x; } return y.Count == 1 && y.First(); }); isFlipped.DumpTimes("Flipped"); 

我试图弄清楚如何使用Window而不是Buffer来在一个隔离的true之后将第一个false的开关翻转回来。 但我似乎无法做到正确,我对Rx还不是很流利,也不确定如何使用windowOpening / Closing值。

示例输出

原版的

 2017-10-07 20:21:39.302 True,False // Rapid values should not flip the switch (actually they should flip a different switch) 2017-10-07 20:21:39.797 True // Flip the switch here 2017-10-07 20:21:40.302 False // Flip the switch back and close the current window 2017-10-07 20:21:40.797 True // Flip the switch here 2017-10-07 20:21:41.297 2017-10-07 20:21:41.798 False // Etc... ... 2017-10-07 20:21:43.297 True 2017-10-07 20:21:43.800 False,True // Example of a window that is open too long, because it is not closed immediately upon the false value ... 

缓冲+扫描

 2017-10-07 20:47:15.154 True 2017-10-07 20:47:15.163 - Flipped-->True 2017-10-07 20:47:15.659 False,True // Example of a window open too long 2017-10-07 20:47:15.661 - Flipped-->False 

这是一种不使用Scan方法的解决方案。

问题似乎是基于两个条件关闭缓冲区 – 最大时间或特定值。 这是基于一个旧的答案

 public static IObservable> BufferWithClosingValue( this IObservable source, TimeSpan maxTime, TSource closingValue) { return source.GroupByUntil(_ => true, g => g.Where(i => i.Equals(closingValue)).Select(_ => Unit.Default) .Merge(Observable.Timer(maxTime).Select(_ => Unit.Default))) .SelectMany(i => i.ToList()); } 

示例用法是

 alternatingTrueFalse.BufferWithClosingValue( TimeSpan.FromMilliseconds(500), false ); 

我建议创建不基于其他运算符的自定义运算符,因为它将占用更多的CPU和内存。

这是同一方法的干净版本。

 public static IObservable> BufferWithThrottle(this IObservable @this, int maxAmount, TimeSpan threshold) { var buffer = new List(); return Observable.Create>(observer => { var aTimer = new Timer(); void Clear() { aTimer.Stop(); buffer.Clear(); } void OnNext() { observer.OnNext(buffer); Clear(); } aTimer.Interval = threshold.TotalMilliseconds; aTimer.Enabled = true; aTimer.Elapsed += (sender, args) => OnNext(); var subscription = @this.Subscribe(value => { buffer.Add(value); if (buffer.Count >= maxAmount) OnNext(); else { aTimer.Stop(); aTimer.Start(); } }); return Disposable.Create(() => { Clear(); subscription.Dispose(); }); }); }