如何在Rx中交替缓冲和流动实时数据流

我有两个流。 一个是数据流(可以是任何类型),另一个是作为门的布尔流。 我需要将这些组合成一个具有以下行为的流:

  • 当门打开(最近的值为真)时,数据应该直接流过
  • 当门关闭(最近的值为假)时,应该缓冲数据,以便在下一次打开门时作为单独的元素释放
  • 解决方案应保留数据的所有元素并保留顺序

我真的不确定如何把它放在一起。 我一直在测试的输入是这样的:

// a demo data stream that emits every second var dataStream = Observable.Interval(TimeSpan.FromSeconds(1)); // a demo flag stream that toggles every 5 seconds var toggle = false; var gateStream = Observable.Interval(TimeSpan.FromSeconds(5)) .Select(_ => toggle = !toggle); 

我会这样做如下:

  • 使用门流作为关闭选择器来窗口化数据流
  • 我们可以在门流上使用DistinctUntilChanged以确保没有重复的值
  • 我们还将强制门流开始关闭(假) – 它不会影响输出并允许一个巧妙的技巧
  • 然后使用Select的重载,为每个元素提供一个索引号。 有了这个,我们可以判断我们是否需要缓冲或只是按原样发出窗口,因为我们知道偶数窗口用于缓冲(因为我们确保门流以false开头)
  • 我们可以使用ToList()来缓冲每个偶数窗口直到它关闭 – 这实际上等同于一个等待直到OnCompletedBuffer()
  • 我们使用标识SelectMany来展平缓冲的窗口
  • 最后,我们连接窗口以保证订单被保留

它看起来像这样:

 dataStream.Window(gateStream.StartWith(false).DistinctUntilChanged()) .Select((w, i) => i % 2 == 0 ? w.ToList().SelectMany(x => x) : w) .Concat() .Subscribe(Console.WriteLine);