如何使用Observable实现轮询?

我有一个参数化的rest调用应该每隔五秒用不同的参数执行:

Observable restCall = api.method1(param1); 

我需要创建一个Observable ,它将使用param1的不同值每5秒轮询一次restCall。 如果api调用失败,我需要收到错误并在5秒内进行下一次调用。 应仅在restCall完成时(成功/错误)测量调用之间的间隔。

我目前正在使用RxJava,但.NET示例也会很好。

介绍

首先,承认,我是一个.NET人,我知道这种方法使用了一些在Java中没有直接等价的习语。 但是我正在接受你的话,并且继续这样做,这是一个.NET家伙会喜欢的一个很好的问题,希望它能引导你走在rx-java的正确道路上,这是我从未看过的。 这是一个很长的答案,但它主要是解释 – 解决方案代码本身很短!

使用Either

我们需要先排序一些工具来帮助解决这个问题。 第一种是使用Either类型。 这很重要,因为每次调用都有两个可能的结果:好结果或错误。 但是我们需要将它们包装在一个类型中 – 我们不能使用OnError来发送错误,因为这会终止结果流。 要么看起来有点像元组,并且更容易处理这种情况。 Rxx库有一个非常完整和良好的Either实现,但这里有一个简单的通用示例,后面是一个简单的实现,适用于我们的目的:

 var goodResult = Either.Right(1); var exception = Either.Left(new Exception()); /* base class for LeftValue and RightValue types */ public abstract class Either { public abstract bool IsLeft { get; } public bool IsRight { get { return !IsLeft; } } public abstract TLeft Left { get; } public abstract TRight Right { get; } } public static class Either { public sealed class LeftValue : Either { TLeft _leftValue; public LeftValue(TLeft leftValue) { _leftValue = leftValue; } public override TLeft Left { get { return _leftValue; } } public override TRight Right { get { return default(TRight); } } public override bool IsLeft { get { return true; } } } public sealed class RightValue : Either { TRight _rightValue; public RightValue(TRight rightValue) { _rightValue = rightValue; } public override TLeft Left { get { return default(TLeft); } } public override TRight Right { get { return _rightValue; } } public override bool IsLeft { get { return false; } } } // Factory functions to create left or right-valued Either instances public static Either Left(TLeft leftValue) { return new LeftValue(leftValue); } public static Either Right(TRight rightValue) { return new RightValue(rightValue); } } 

请注意,按照惯例,当使用Either建模成功或失败时,右侧用于成功的值,因为它当然是“正确的”:)

一些辅助函数

我将使用一些辅助函数来模拟问题的两个方面。 首先,这是一个生成参数的工厂 – 每次调用它时,它将返回以1开头的整数序列中的下一个整数:

 // An infinite supply of parameters private static int count = 0; public int ParameterFactory() { return ++count; } 

接下来,这是一个将您的Rest调用模拟为IObservable的函数。 此函数接受整数并且:

  • 如果整数是偶数,则返回一个立即发送OnError的Observable。
  • 如果整数是奇数,则返回一个字符串,将整数与“-ret”连接起来,但仅在经过一秒后才返回。 我们将使用它来检查轮询间隔是否按照您的请求运行 – 作为完成调用之间的暂停,无论它们花费多长时间,而不是常规间隔。

这里是:

 // A asynchronous function representing the REST call public IObservable SomeRestCall(int x) { return x % 2 == 0 ? Observable.Throw(new Exception()) : Observable.Return(x + "-ret").Delay(TimeSpan.FromSeconds(1)); } 

现在好位

下面是一个相当通用的可重用函数,我称之为Poll 。 它接受一个将被轮询的异步函数,该函数的参数工厂,所需的rest(没有双关语!)间隔,最后是一个要使用的IScheduler。

我能想到的最简单的方法是使用Observable.Create ,它使用调度程序来驱动结果流。 ScheduleAsync是一种使用.NET async / await表单的调度方式。 这是一个.NET习惯用法,允许您以强制方式编写异步代码。 async关键字引入了一个异步函数,然后可以await它的正文中的一个或多个异步调用,并且只有在调用完成时才会继续。 我在这个问题中写了这种调度风格的长篇解释,其中包括较旧的递归风格,这种风格可能更容易在rx-java方法中实现。 代码如下所示:

 public IObservable> Poll( Func> asyncFunction, Func parameterFactory, TimeSpan interval, IScheduler scheduler) { return Observable.Create>(observer => { return scheduler.ScheduleAsync(async (ctrl, ct) => { while(!ct.IsCancellationRequested) { try { var result = await asyncFunction(parameterFactory()); observer.OnNext(Either.Right(result)); } catch(Exception ex) { observer.OnNext(Either.Left(ex)); } await ctrl.Sleep(interval, ct); } }); }); } 

打破这一点, Observable.Create通常是一个创建IObservables的工厂,它可以让你对结果发布给观察者的方式有很大的控制。 它经常被忽视,支持不必要的复杂原始组合。

在这种情况下,我们使用它来创建一个Either流,以便我们可以返回成功和失败的轮询结果。

Create函数接受一个观察者,该观察者代表我们通过OnNext / OnError / OnCompleted将结果传递给的订阅者。 我们需要在Create调用中返回一个IDisposable – 在.NET中,这是订阅者可以取消订阅的句柄。 这在这里特别重要,因为轮询将永远继续下去 – 或者至少它永远不会完成。

ScheduleAsync (或普通Schedule )的结果就是这样一个句柄。 处置后,它将取消我们预定的任何未决事件 – 从而结束轮询循环。 在我们的例子中,我们用来管理间隔的Sleep是可取消的操作,尽管可以很容易地修改Poll函数以接受也接受CancellationToken的可取消的asyncFunction

ScheduleAsync方法接受将调用以调度事件的函数。 它传递了两个参数,第一个ctrl是调度程序本身。 第二个ct是CancellationToken我们可以用来查看是否已经请求取消(由订阅者处理他们的订阅句柄)。

轮询本身通过无限循环执行,该循环仅在CancellationToken指示已请求取消时终止。

在循环中,我们可以使用async / await的魔力来异步调用轮询函数,但仍然将它包装在exception处理程序中。 这太棒了! 假设没有错误,我们通过OnNext将结果作为Either正确值发送给观察者。 如果有exception,我们将其作为Either值发送给观察者。 最后,我们使用调度程序上的Sleep函数在rest间隔之后调度唤醒调用 – 不要与Thread.Sleep调用混淆,这通常不会阻塞任何线程。 请注意,Sleep接受CancellationToken ,也可以中止!

我想你会同意这是一个非常酷的async / await使用来简化本来一个非常棘手的问题!

示例用法

最后,这里有一些调用Poll测试代码以及​​示例输出 – 对于LINQPad风扇,本答案中的所有代码将在LINQPad中运行,并引用Rx 2.1程序集:

 void Main() { var subscription = Poll(SomeRestCall, ParameterFactory, TimeSpan.FromSeconds(5), ThreadPoolScheduler.Instance) .TimeInterval() .Subscribe(x => { Console.Write("Interval: " + x.Interval); var result = x.Value; if(result.IsRight) Console.WriteLine(" Success: " + result.Right); else Console.WriteLine(" Error: " + result.Left.Message); }); Console.ReadLine(); subscription.Dispose(); } Interval: 00:00:01.0027668 Success: 1-ret Interval: 00:00:05.0012461 Error: Exception of type 'System.Exception' was thrown. Interval: 00:00:06.0009684 Success: 3-ret Interval: 00:00:05.0003127 Error: Exception of type 'System.Exception' was thrown. Interval: 00:00:06.0113053 Success: 5-ret Interval: 00:00:05.0013136 Error: Exception of type 'System.Exception' was thrown. 

请注意,如果立即返回错误,则结果之间的间隔为5秒(轮询间隔),或者成功结果为6秒(轮询间隔加模拟REST调用持续时间)。

编辑 – 这是一个替代实现, 使用ScheduleAsync,但使用旧式递归调度,没有async / await语法。 正如您所看到的,它更加混乱 – 但它也支持取消asyncFunction observable。

  public IObservable> Poll( Func> asyncFunction, Func parameterFactory, TimeSpan interval, IScheduler scheduler) { return Observable.Create>( observer => { var disposable = new CompositeDisposable(); var funcDisposable = new SerialDisposable(); bool cancelRequested = false; disposable.Add(Disposable.Create(() => { cancelRequested = true; })); disposable.Add(funcDisposable); disposable.Add(scheduler.Schedule(interval, self => { funcDisposable.Disposable = asyncFunction(parameterFactory()) .Finally(() => { if (!cancelRequested) self(interval); }) .Subscribe( res => observer.OnNext(Either.Right(res)), ex => observer.OnNext(Either.Left(ex))); })); return disposable; }); } 

请参阅我的其他答案,了解避免使用.NET 4.5 async / awaitfunction并且不使用Schedule调用的其他方法。

我希望这对rx-java家伙有所帮助!

我已经清理了直接使用Schedule调用的方法 – 使用我的其他答案中的Either类型 – 它也可以使用相同的测试代码并给出相同的结果:

  public IObservable> Poll2( Func> asyncFunction, Func parameterFactory, TimeSpan interval, IScheduler scheduler) { return Observable.Create>( observer => Observable.Defer(() => asyncFunction(parameterFactory())) .Select(Either.Right) .Catch, Exception>( ex => Observable.Return(Either.Left(ex))) .Concat(Observable.Defer( () => Observable.Empty>() .Delay(interval, scheduler))) .Repeat().Subscribe(observer)); } 

这有适当的取消语义。

实施说明

  • 整个构造使用Repeat来获得循环行为。
  • 初始Defer用于确保在每次迭代时传递不同的参数
  • Select将OnNext结果投影到右侧的Either
  • Catch将OnError结果投影到左侧的Either – 请注意,此exception终止当前的asyncFunction observable,因此需要重复
  • Concat增加了间隔延迟

我的观点是调度版本更具可读性,但是这个版本不使用async / await,因此与.NET 4.0兼容。