使用Reactive Extensions进行数据库轮询

我必须及时查询数据库以了解遗留系统的状态。 我想过围绕一个Observable包装查询,但我不知道正确的方法。

基本上,它将每5秒进行一次相同的查询。 但我担心我将不得不面对这些问题:

  • 如果执行查询需要10秒钟怎么办? 如果仍在处理上一个查询,我不想执行任何新查询。
  • 此外,应该有一个超时。 如果当前查询在例如20秒之后未执行,则应记录信息性消息并应发送新的尝试(同一查询)。

额外细节:

  • 查询只是一个SELECT ,它返回一个包含状态代码列表的数据集( 工作出错 )。
  • Observable序列将始终采用从查询接收的最新数据,例如Switch扩展方法。
  • 我想将数据库查询(lenghty操作)包装到一个Task中,但我不确定它是否是最佳选择。

我几乎可以确定查询应该在另一个线程中执行,但是我不知道observable应该是什么样子,曾经阅读过Lee Campbell的Rx简介 。

这是使用Rx轮询另一个系统的相当经典的案例。 大多数人会使用Observable.Interval作为他们的Observable.Interval操作员,对大多数人来说,这样会很好。

但是,您对超时和重试有特定要求。 在这种情况下,我认为你最好使用运算符的组合:

  • Observable.Timer允许您在指定时间内执行查询
  • Timeout以识别已超限的数据库查询
  • ToObservable()将您的Task结果映射到可观察的序列。
  • Retry以允许您在超时后恢复
  • Repeat以允许您在成功进行数据库查询后继续。 这还将保持先前数据库查询的完成与下一个数据库查询的开始之间的初始时段/差距。

这个工作LINQPad代码段应该显示查询正常工作:

 void Main() { var pollingPeriod = TimeSpan.FromSeconds(5); var dbQueryTimeout = TimeSpan.FromSeconds(10); //You will want to have your Rx query timeout after the expected silence of the timer, and then further maximum silence. var rxQueryTimeOut = pollingPeriod + dbQueryTimeout; var scheduler = new EventLoopScheduler(ts => new Thread(ts) { Name = "DatabasePoller" }); var query = Observable.Timer(pollingPeriod, scheduler) .SelectMany(_ => DatabaseQuery().ToObservable()) .Timeout(rxQueryTimeOut, Observable.Return("Timeout"), scheduler) .Retry() //Loop on errors .Repeat(); //Loop on success query.StartWith("Seed") .TimeInterval(scheduler) //Just to debug, print the timing gaps. .Dump(); } // Define other methods and classes here private static int delay = 9; private static int delayModifier = 1; public async Task DatabaseQuery() { //Oscillate the delay between 3 and 12 seconds delay += delayModifier; var timespan = TimeSpan.FromSeconds(delay); if (delay < 4 || delay > 11) delayModifier *= -1; timespan.Dump("delay"); await Task.Delay(timespan); return "Value"; } 

结果如下:

 Seed 00:00:00.0125407 Timeout 00:00:15.0166379 Timeout 00:00:15.0124480 Timeout 00:00:15.0004520 Timeout 00:00:15.0013296 Timeout 00:00:15.0140864 Value 00:00:14.0251731 Value 00:00:13.0231958 Value 00:00:12.0162236 Value 00:00:11.0138606 

样本的关键部分是….

 var query = Observable.Timer(TimeSpan.FromSeconds(5), scheduler) .SelectMany(_ => DatabaseQuery().ToObservable()) .Timeout(rxQueryTimeOut, Observable.Return("Timeout"), scheduler) .Retry() //Loop on errors .Repeat(); //Loop on success 

编辑:这是对如何达成此解决方案的进一步说明。 https://github.com/LeeCampbell/RxCookbook/blob/master/Repository/Polling.md

我认为这是你应该做的:

 var query = from n in Observable.Interval(TimeSpan.FromSeconds(5.0)) from ds in Observable.Amb( Observable.Start(() => /* Your DataSet query */), Observable .Timer(TimeSpan.FromSeconds(10.0)) .Select(_ => new DataSet("TimeOut"))) select ds; 

这会触发一个新查询,执行时间间隔为5秒。 距离最后一个开始不是5秒,距离最后一个结束时间为5秒。

然后你尝试你的查询,但你.Amb它与一个定时器,在10秒后返回一个特殊的数据集。 如果您的查询在10秒之前完成,那么它将获胜,但否则将返回特殊的DataSet.Amb运算符基本上是一个“竞争”运算符 – 第一个产生值获胜的可.Amb