Rx中可推进的历史流和直播
我有一个热门观察,我通常使用下面的普通Subject
实现,以便有兴趣的人可以订阅实时的通知流。
现在我想保留这个实时流,但是还要公开所有事件的历史流,这些事件已经并且具有绝对时间附加到那些通知以了解它们究竟发生的时间因为允许订户将历史流推进到任何在重播年表之前的时间点。
- 我相信大部分可以使用HistoricalScheduler及其AdvanceTo方法实现,但我不确定如何?
- 是否使用Timestamped来节省所需事件的时间?
- ReplaySubject是否需要将实时流缓存到历史记录中,然后可以使用HistoricalScheduler进行回放?
如何为同一来源实施这两个流,或者换句话说,如何将下面的内容用于当前的要求?
[见“重播过去”标题]
HistoricalScheduler
为您提供的是控制调度程序虚拟时间的前向运动的能力。
你没有得到的是随着时间的推移随机访问。 随着虚拟时间的提前,执行预定的操作,因此必须提前安排它们。 过去安排的任何操作 – 即在HistoricalScheduler.Now
值后面的绝对时间 – 立即执行。
要重放事件,您需要以某种方式记录它们,然后使用HistoricalScheduler
的实例安排它们 – 然后提前时间。
当您提前计划时,计划的操作将在其到期时间执行 – 当可观察的事件将OnXXX()
发送给其订户时,调度程序的Now
属性将具有当前的虚拟时间。
每个订户都需要访问它自己的调度程序,以便独立于其他订户控制时间。 这实际上意味着为每个订户创建一个可观察的。
这是一个我敲了一个快速的例子(如果您引用了nuget包rx-main,它将在LINQPad中运行)。
首先,我记录一个实时流(以完全非生产方式!)将事件记录到列表中。 如你所知,使用TimeStamp()
可以很好地捕获时间:
/* record a live stream */ var source = Observable.Interval(TimeSpan.FromSeconds(1)); var log = source.Take(5).Timestamp().ToList().Wait(); Console.WriteLine("Time now is " + DateTime.Now);
现在我们可以使用HistoricalScheduler结合使用Generate来安排事件。 请注意,这种方法可以防止大量预定事件提前排队 – 而不是我们一次只安排一个:
var scheduler = new HistoricalScheduler(); /* set up the scheduling of the recording events */ var replay = Observable.Generate( log.GetEnumerator(), events => events.MoveNext(), events => events, events => events.Current.Value, events => events.Current.Timestamp, scheduler);
现在,当我们订阅时,您可以看到HistoricalScheduler
的Now
属性具有事件的虚拟时间:
replay.Subscribe( i => Console.WriteLine("Event: {0} happened at {1}", i, scheduler.Now));
最后我们可以开始计划(使用Start()只是尝试播放所有事件,而不是使用AdvanceTo
移动到特定时间 – 就像做AdvanceTo(DateTime.MaxValue);
scheduler.Start();
我的输出是:
Time now is 07/01/2014 15:17:27 Event: 0 happened at 07/01/2014 15:17:23 +00:00 Event: 1 happened at 07/01/2014 15:17:24 +00:00 Event: 2 happened at 07/01/2014 15:17:25 +00:00 Event: 3 happened at 07/01/2014 15:17:26 +00:00 Event: 4 happened at 07/01/2014 15:17:27 +00:00
结果是你最终可能不得不通过这个工具创建自己的API来获得适合你特定目的的东西。 它给你留下了相当多的工作 – 但它仍然是非常强大的东西。
有趣的是,实时可观察和重放的可观察对象看起来并没有什么不同 – 只要你记得总是参数化你的调度程序(!) – 这样就可以轻松地在它们上面运行相同的查询,时间查询全部与调度程序的虚拟时间。
我已经使用它来测试旧数据的新查询,以便在商业场景中发挥重要作用。
它没有尝试的是传输控制 ,例如在GUI中来回滚动时间。 通常,您以大块运行历史记录,存储新查询的输出,然后使用此数据随后在GUI中显示,以便用户可以通过您提供的其他一些机制来回移动。
最后,您不需要ReplaySubject
来缓存直播流; 但是你确实需要一些记录事件来重放的方法 – 这可能只是一个写入日志的观察者。