如何基于Func 将IObservable 窗口/缓冲到块中
给出一个类:
class Foo { DateTime Timestamp {get; set;} }
…和一个IObservable
,保证单调增加 Timestamp
s,如何根据这些Timestamp
s生成一个嵌入到列表中的IObservable<IList>
?
即每个IList
应该有五秒钟的事件,或者其他什么。 我知道我可以使用带有TimeSpan
重载的Buffer
,但我需要花时间从事件本身,而不是挂钟。 (除非有一个聪明的方法在这里提供一个IScheduler
,它使用IObservable
本身作为IScheduler
的来源?)
如果我尝试使用Observable.Buffer(this IObservable source, IObservable bufferBoundaries)
重载如下:
IObservable foos = //...; var pub = foos.Publish(); var windows = pub.Select(x => new DateTime( x.Ticks - x.Ticks % TimeSpan.FromSeconds(5).Ticks)).DistinctUntilChanged(); pub.Buffer(windows).Subscribe(x => t.Dump())); // linqpad pub.Connect();
…然后IList
实例包含导致窗口关闭的项目 ,但我真的希望这个项目进入下一个窗口/缓冲区。
例如,使用时间戳[0, 1, 10, 11, 15]
0,1,10,11,15 [0, 1, 10, 11, 15]
您将获得[[0], [1, 10], [11, 15]]
而不是[[0, 1], [10, 11], [15]]
这是一个想法。 组密钥条件是“窗口号”,我使用GroupByUntil
。 这为您提供了示例中所需的输出(我使用了一个类似于该示例的int流 – 但您可以替换为窗口编号所需的任何内容)。
public class Tests : ReactiveTest { public void Test() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(0, 0), OnNext(1, 1), OnNext(10, 10), OnNext(11, 11), OnNext(15, 15), OnCompleted(16, 0)); xs.Publish(ps => // (1) ps.GroupByUntil( p => p / 5, // (2) grp => ps.Where(p => p / 5 != grp.Key)) // (3) .SelectMany(x => x.ToList())) // (4) .Subscribe(Console.WriteLine); scheduler.Start(); } }
笔记
- 我们发布源流,因为我们将多次订阅。
- 这是一个创建组密钥的function – 使用此function从项目类型生成窗口编号。
- 这是组终止条件 – 使用它来检查另一个窗口中项目的源流。 请注意,这意味着窗口在其外部的元素到达或源流终止之前不会关闭。 如果你考虑一下这很明显 – 你需要的输出需要在窗口结束后考虑下一个元素。 请注意,如果您的源与实时有任何关系,您可以将其与
Observable.Timer+Select
合并,以输出您的术语的null / default实例以更早地终止该流。 - SelectMany将组放入列表并展平流。
如果你包含nuget包rx-testing,这个例子将很好地在LINQPad中运行。 新建一个Tests实例,然后运行Test()
方法。
Window
是Buffer
的泛化, GroupJoin
是Window
(和Join
)的泛化。 当您编写Window
或Buffer
查询并且发现通知被错误地包含在窗口/列表的边缘或从窗口/列表的边缘排除时,然后根据GroupJoin
重新定义查询以控制边缘通知到达的位置。
请注意,为了使关闭通知可用于新打开的窗口,您必须将边界定义为这些通知的窗口(窗口数据,而不是边界数据)。 在您的情况下,您不能使用一系列DateTime值作为边界,您必须使用一系列Foo对象。 为实现此目的,我已将Scan
– > Where
– > Select
查询替换为Select
– > DistinctUntilChanged
查询。
var batches = foos.Publish(publishedFoos => publishedFoos .Scan( new { foo = (Foo)null, last = DateTime.MinValue, take = true }, (acc, foo) => { var boundary = foo.Timestamp - acc.last >= TimeSpan.FromSeconds(5); return new { foo, last = boundary ? foo.Timestamp : acc.last, take = boundary }; }) .Where(a => a.take) .Select(a => a.foo) .Publish(boundaries => boundaries .Skip(1) .StartWith((Foo)null) .GroupJoin( publishedFoos, foo => foo == null ? boundaries.Skip(1) : boundaries, _ => Observable.Empty(), (foo, window) => (foo == null ? window : window.StartWith(foo)).ToList()))) .Merge() .Replay(lists => lists.SkipLast(1) .Select(list => list.Take(list.Count - 1)) .Concat(lists), bufferSize: 1);
只有当您希望序列最终结束并且您不关心不丢弃最后一个通知时,才需要最后的Replay
查询; 否则,你可以简单地修改window.StartWith(foo)
到window.StartWith(foo).SkipLast(1)
来实现相同的基本结果,虽然最后一个缓冲区的最后通知会丢失。
我认为詹姆斯世界的答案更整洁/更具可读性,但对于后人,我发现了使用Buffer()
另一种方法:
IObservable foos = //...; var pub = foos.Publish(); var windows = pub.Select(x => new DateTime( x.Ticks - x.Ticks % TimeSpan.FromSeconds(5).Ticks)) .DistinctUntilChanged().Publish.RefCount(); pub.Buffer(windows, x => windows).Subscribe(x => t.Dump())); pub.Connect();
凭借1000万次赛事,詹姆斯的进近速度超过2.5倍(我的机器为20秒对56秒)。