如何基于Func 将IObservable 窗口/缓冲到块中

给出一个类:

class Foo { DateTime Timestamp {get; set;} } 

…和一个IObservable ,保证单调增加 Timestamp s,如何根据这些Timestamp s生成一个嵌入到列表中的IObservable<IList>

即每个IList应该有五秒钟的事件,或者其他什么。 我知道我可以使用带有TimeSpan重载的Buffer ,但我需要花时间从事件本身,而不是挂钟。 (除非有一个聪明的方法在这里提供一个IScheduler ,它使用IObservable本身作为IScheduler的来源?)

如果我尝试使用Observable.Buffer(this IObservable source, IObservable bufferBoundaries)重载如下:

 IObservable foos = //...; var pub = foos.Publish(); var windows = pub.Select(x => new DateTime( x.Ticks - x.Ticks % TimeSpan.FromSeconds(5).Ticks)).DistinctUntilChanged(); pub.Buffer(windows).Subscribe(x => t.Dump())); // linqpad pub.Connect(); 

…然后IList实例包含导致窗口关闭的项目 ,但我真的希望这个项目进入下一个窗口/缓冲区。

例如,使用时间戳[0, 1, 10, 11, 15] 0,1,10,11,15 [0, 1, 10, 11, 15]您将获得[[0], [1, 10], [11, 15]]而不是[[0, 1], [10, 11], [15]]

这是一个想法。 组密钥条件是“窗口号”,我使用GroupByUntil 。 这为您提供了示例中所需的输出(我使用了一个类似于该示例的int流 – 但您可以替换为窗口编号所需的任何内容)。

 public class Tests : ReactiveTest { public void Test() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(0, 0), OnNext(1, 1), OnNext(10, 10), OnNext(11, 11), OnNext(15, 15), OnCompleted(16, 0)); xs.Publish(ps => // (1) ps.GroupByUntil( p => p / 5, // (2) grp => ps.Where(p => p / 5 != grp.Key)) // (3) .SelectMany(x => x.ToList())) // (4) .Subscribe(Console.WriteLine); scheduler.Start(); } } 

笔记

  1. 我们发布源流,因为我们将多次订阅。
  2. 这是一个创建组密钥的function – 使用此function从项目类型生成窗口编号。
  3. 这是组终止条件 – 使用它来检查另一个窗口中项目的源流。 请注意,这意味着窗口在其外部的元素到达或源流终止之前不会关闭。 如果你考虑一下这很明显 – 你需要的输出需要在窗口结束后考虑下一个元素。 请注意,如果您的源与实时有任何关系,您可以将其与Observable.Timer+Select合并,以输出您的术语的null / default实例以更早地终止该流。
  4. SelectMany将组放入列表并展平流。

如果你包含nuget包rx-testing,这个例子将很好地在LINQPad中运行。 新建一个Tests实例,然后运行Test()方法。

WindowBuffer的泛化, GroupJoinWindow (和Join )的泛化。 当您编写WindowBuffer查询并且发现通知被错误地包含在窗口/列表的边缘或从窗口/列表的边缘排除时,然后根据GroupJoin重新定义查询以控制边缘通知到达的位置。

请注意,为了使关闭通知可用于新打开的窗口,您必须将边界定义为这些通知的窗口(窗口数据,而不是边界数据)。 在您的情况下,您不能使用一系列DateTime值作为边界,您必须使用一系列Foo对象。 为实现此目的,我已将Scan – > Where – > Select查询替换为Select – > DistinctUntilChanged查询。

 var batches = foos.Publish(publishedFoos => publishedFoos .Scan( new { foo = (Foo)null, last = DateTime.MinValue, take = true }, (acc, foo) => { var boundary = foo.Timestamp - acc.last >= TimeSpan.FromSeconds(5); return new { foo, last = boundary ? foo.Timestamp : acc.last, take = boundary }; }) .Where(a => a.take) .Select(a => a.foo) .Publish(boundaries => boundaries .Skip(1) .StartWith((Foo)null) .GroupJoin( publishedFoos, foo => foo == null ? boundaries.Skip(1) : boundaries, _ => Observable.Empty(), (foo, window) => (foo == null ? window : window.StartWith(foo)).ToList()))) .Merge() .Replay(lists => lists.SkipLast(1) .Select(list => list.Take(list.Count - 1)) .Concat(lists), bufferSize: 1); 

只有当您希望序列最终结束并且您不关心不丢弃最后一个通知时,才需要最后的Replay查询; 否则,你可以简单地修改window.StartWith(foo)window.StartWith(foo).SkipLast(1)来实现相同的基本结果,虽然最后一个缓冲区的最后通知会丢失。

我认为詹姆斯世界的答案更整洁/更具可读性,但对于后人,我发现了使用Buffer()另一种方法:

 IObservable foos = //...; var pub = foos.Publish(); var windows = pub.Select(x => new DateTime( x.Ticks - x.Ticks % TimeSpan.FromSeconds(5).Ticks)) .DistinctUntilChanged().Publish.RefCount(); pub.Buffer(windows, x => windows).Subscribe(x => t.Dump())); pub.Connect(); 

凭借1000万次赛事,詹姆斯的进近速度超过2.5倍(我的机器为20秒对56秒)。