如何在一段时间内对一个流进行分区(GroupBy)并监视Rx中元素的缺失?

前几天我一直在尝试编写一个Rx查询来处理来自源的事件流,并检查是否缺少某些ID。 定义缺席使得存在一系列时间窗口(例如,在从9:00到17:00的所有日期),在该时间窗口期间应该存在最多20分钟而没有在流中出现ID。 更复杂的是,应根据身份certificate确定缺勤时间。 例如,假设出现在事件的组合流(A,A,B,C,A,C,B等)中的三种事件A,B和C,可以定义为

  • 活动将在每天的9:00至10:00进行监控,最多不超过10分钟。
  • B事件每天从9:00到11:00进行监控,最多没有事件为5分钟。
  • C事件的监控时间为每天的12:00至15:00,最多没有事件为30分钟。

我想我需要首先对流进行分区以通过GroupBy分离事件,然后使用缺少规则处理生成的单独流。 我已经在Microsoft Rx论坛上对此进行了一些考虑(非常感谢Dave),我有一些工作代码可以生成规则并进行缺失检查,但是我很挣扎,例如,如何将其与分组相结合。

所以,没有进一步的演讲,迄今为止被黑客攻击的代码:

//Some sample data bits representing the events. public class FakeData { public int Id { get; set; } public string SomeData { get; set; } } //Note the Now part in DateTime to zero the clock time and have only the date. The purpose is to create start-end pairs of times, eg 9:00-17:00. //The alarm start and end time points should match themselves pairwise, could be pairs of values... var maxDate = DateTime.Now.Date.AddHours(17).AddMinutes(0).AddSeconds(0).AddDays(14); var startDate = DateTime.Now.Date.AddHours(9).AddMinutes(0).AddSeconds(0); var alarmStartPeriods = Enumerable.Range(0, 1 + (maxDate - startDate).Days).Select(d => new DateTimeOffset(startDate.AddDays(d))).ToList(); var alarmEndPeriods = Enumerable.Range(0, 1 + (maxDate - startDate).Days).Select(d => new DateTimeOffset(startDate.AddDays(d)).AddHours(5)).ToList(); 

并且在没有分组的情况下进行缺勤检查的查询,这是我的一个难点。 <编辑:也许我应该将时间点分组成对并添加一个ID并在查询中使用生成的三元组…

 dataSource = from n in Observable.Interval(TimeSpan.FromMilliseconds(100)) select new FakeData { Id = new Random().Next(1, 5), SomeData = DateTimeOffset.Now.ToString() }; var startPointOfTimeChanges = alarmStartPeriods.ToObservable(); var endPointOfTimeChanges = alarmEndPeriods.ToObservable(); var durations = startPointOfTimeChanges.CombineLatest(endPointOfTimeChanges, (start, end) => new { start, end }); var maximumInactivityTimeBeforeAlarmSignal = TimeSpan.FromMilliseconds(250); timer = (from duration in durations select (from _ in Observable.Timer(DateTime.Now) from x in dataSource.Throttle(maximumInactivityTimeBeforeAlarmSignal).TakeUntil(duration.end) select x)).Switch(); timer.Subscribe(x => Debug.WriteLine(x.SomeData)); 

问题:

  • 我应该如何通过ID尝试GroupBy传入的数据,并仍然能够定义事件的缺席?
  • 我注意到的一件事是,如果报警周期的起点是过去的(例如,查询是在10:00开始的,当规则说明在9:00开始监控时),查询将不会启动。 我想,开始时间应该推到现在的时间。 有没有一些标准方法可以做到这一点,还是我应该引入条件?

我能想到的其他问题会很好(为了娱乐自己):):

  • 如何利用每个ID发生的最新事件?
  • 如何动态更改变量(正如Dave已经在MS论坛中提到的那样)?
  • 然后,最后,批处理事件并存储在某个地方(例如数据库),就像在PeteGoo博客中的这个奇妙的例子一样?

我能想到的其他选择是明确使用

 System.Threading.Timers 

 ConcurrentDictionary 

,但需要继续学习!

关于詹姆斯的输入答案,这里是快速解释它如何工作以及我打算如何使用它。

首先,observable在第一个事件进入之前不会做任何事情。因此,如果监控应立即开始,则需要添加一些其他Rxfunction或触发虚拟事件。 我相信不是问题。

其次,将从alarmInterval为任何新ID获取新的超时变量。 在这里,新的甚至是一个已经缺席太久而引发了警报的人。

我认为这很有效,因为人们可以订阅这个可观察的东西并做一些副作用。 一些例子可能是设置标志,发送信号以及业务规则有哪些。 此外,保持适当的锁定等等,应该很容易根据预定义的警报规则提供新的时间跨度,并且具有分开的缺席时间和时间窗口。

我将不得不研究与此相关的其他概念,以便更好地掌握事物。 但我的主要担忧是满意的。 生活很美好。 🙂

编辑 – 改进了代码,简化了SelectMany以使用TakeLast

我写了一篇关于检测断开连接的客户端的博客文章 – 如果您使用下面的alarmInterval函数替换post中的timeToHold变量来获取基于客户端ID的限制时间,那么这对于您的场景也同样适用。

例如:

 // idStream is an IObservable of the input stream of IDs // alarmInterval is a Func that gets the interval given the ID var idAlarmStream = idStream .GroupByUntil(key => key, grp => grp.Throttle(alarmInterval(grp.Key))) .SelectMany(grp => grp.TakeLast(1)); 

这为您提供了持续监控的基本function,而无需查看活动监控周期。

为了获得监视器窗口的function,我转过来并使用WHERE过滤上面的输出,检查以查看发出的ID是否落在监视时间窗口中。 这使得更容易处理不断变化的监控周期。

您可以通过将每个监控窗口转换为流并将其与警报流相结合来做更高级的事情,但我不相信额外复杂性的好处。

alarmInterval函数还将为您提供动态警报间隔的元素,因为它可以返回新值,但这些仅在警报因该ID发生故障后生效,从而结束其当前组。

—走进这里的一些理论—

为了让这个完全充满活力,你必须以某种方式结束这个小组 – 你可以通过几种方式来做到这一点。

一种方法是使用Select将idSt投影到一个自定义类型的流中,该类包含ID加上全局计数器值。 为此类型提供适当的相等实现,以便它可以正确地使用GroupByUntil。

现在每次更改警报间隔时,请更改计数器。 这将导致为每个ID创建新组。 然后,您可以在最终filter中添加一个额外的检查,以确保输出事件具有最新的计数器值。