Rx后退并重试

这基于此SO中提供的代码: 编写Rx“RetryAfter”扩展方法

我正在使用Markus Olsson的代码(目前仅进行评估),之前有人问我试图在Github上抓住Markus,但是我在工作的地方被封锁了,所以我觉得我唯一能做的就是问在这里。 很抱歉,如果这与任何人都很糟糕。

所以我在一个小的演示中使用以下代码:

class Attempt1 { private static bool shouldThrow = true; static void Main(string[] args) { Generate().RetryWithBackoffStrategy(3, MyRxExtensions.ExponentialBackoff, ex => { return ex is NullReferenceException; }, Scheduler.TaskPool) .Subscribe( OnNext, OnError ); Console.ReadLine(); } private static void OnNext(int val) { Console.WriteLine("subscriber value is {0} which was seen on threadId:{1}", val, Thread.CurrentThread.ManagedThreadId); } private static void OnError(Exception ex) { Console.WriteLine("subscriber bad {0}, which was seen on threadId:{1}", ex.GetType(), Thread.CurrentThread.ManagedThreadId); } static IObservable Generate() { return Observable.Create( o => { Scheduler.TaskPool.Schedule(() => { if (shouldThrow) { shouldThrow = false; Console.WriteLine("ON ERROR NullReferenceException"); o.OnError(new NullReferenceException("Throwing")); } Console.WriteLine("Invoked on threadId:{0}", Thread.CurrentThread.ManagedThreadId); Console.WriteLine("On nexting 1"); o.OnNext(1); Console.WriteLine("On nexting 2"); o.OnNext(2); Console.WriteLine("On nexting 3"); o.OnNext(3); o.OnCompleted(); Console.WriteLine("On complete"); Console.WriteLine("Finished on threadId:{0}", Thread.CurrentThread.ManagedThreadId); }); return () => { }; }); } } public static class MyRxExtensions { ///  /// An exponential back off strategy which starts with 1 second and then 4, 9, 16... ///  public static readonly Func ExponentialBackoff = n => TimeSpan.FromSeconds(Math.Pow(n, 2)); public static IObservable RetryWithBackoffStrategy( this IObservable source, int retryCount = 3, Func strategy = null, Func retryOnError = null, IScheduler scheduler = null) { strategy = strategy ?? MyRxExtensions.ExponentialBackoff; int attempt = 0; return Observable.Defer(() => { return ((++attempt == 1) ? source : source.Delay(strategy(attempt - 1), scheduler)) .Select(item => new Tuple(true, item, null)) .Catch<Tuple, Exception>(e => retryOnError(e) ? Observable.Throw<Tuple>(e) : Observable.Return(new Tuple(false, default(T), e))); }) .Retry(retryCount) .SelectMany(t => t.Item1 ? Observable.Return(t.Item2) : Observable.Throw(t.Item3)); } } 

代码对我来说很有意义,如果失败我们尝试做一个操作我们退回并重试。 我们可以指定我们想要重试的exception类型,并且我们还看到订阅者只有在重试后才能看到最终值一次(在Exception上面的演示代码中只完成了(第一次出现OnError))。

所以通常代码按预期工作,除了一件事。

如果我查看输出上面的代码产生我得到这个:

 ON ERROR NullReferenceException Invoked on threadId:10 On nexting 1 Invoked on threadId:11 On nexting 1 On nexting 2 On nexting 3 On complete Finished on threadId:10 On nexting 2 On nexting 3 On complete Finished on threadId:11 subscriber value is 1 which was seen on threadId:10 subscriber value is 2 which was seen on threadId:10 subscriber value is 3 which was seen on threadId:10 

对我来说有趣的是,订阅者的所有值都是一次性的,我希望当Generate()方法中的OnNext被调用时,Subscribers OnNext会将OnNext的值写入Console输出。

任何人都可以解释为什么会这样吗?

这是因为您在结果流上放置了Delay 。 (在第二次迭代中传递给ExponentialBackoff n值为1,延迟为1秒。)

Delay在源上运行, 但源正常进行。 Delay计划在指定的持续时间后发出的结果。 因此订阅者在Generate的逻辑运行完成后获得结果。

如果您考虑一下这就是Delay必须如何 – 否则Delay将能够以某种方式干扰上游运营商!

作为一个缓慢的消费者,可能会干扰上游运营商(不会抛出exception)。 但对于简单的Delay行为来说,这肯定是一种非常糟糕的方式。

我认为Delay不是你想要的 – 因为Delay不会延迟它的订阅。 如果你改用DelaySubscription ,你会得到我想的。 这也是链接问题中使用的内容。

您的问题很好地说明了DelayDelaySubscription之间的区别! 在这里也值得考虑Defer

这三者之间的区别是微妙但重要的,所以让我们总结三者:

  • Delay – 立即调用目标操作员获取IObservable ,在其Subscribe调用上立即在目标上Subscribe ,在指定的Scheduler上指定延迟后调度事件。

  • DelaySubscription – 立即调用目标操作符以获取IObservable 。 在其Subscribe计划上Subscribe目标以在指定的Scheduler上指定的延迟后执行。

  • Defer – 没有目标运营商。 在Subscribe运行时提供了工厂函数来获取目标IObservable并立即调用Subscribe 。 没有添加延迟,因此没有指定的Scheduler

对于发布在这篇文章上的其他任何人来说,确实是由詹姆斯世界和布兰登提出的建议(感谢chaps)。

这是完整的工作代码

 class Attempt1 { private static bool shouldThrow = true; static void Main(string[] args) { Generate().RetryWithBackoffStrategy(3, MyRxExtensions.ExponentialBackoff, ex => { return ex is NullReferenceException; }, Scheduler.TaskPool) .Subscribe( OnNext, OnError ); Console.ReadLine(); } private static void OnNext(int val) { Console.WriteLine("subscriber value is {0} which was seen on threadId:{1}", val, Thread.CurrentThread.ManagedThreadId); } private static void OnError(Exception ex) { Console.WriteLine("subscriber bad {0}, which was seen on threadId:{1}", ex.GetType(), Thread.CurrentThread.ManagedThreadId); } static IObservable Generate() { return Observable.Create( o => { Scheduler.TaskPool.Schedule(() => { if (shouldThrow) { shouldThrow = false; Console.WriteLine("ON ERROR NullReferenceException"); o.OnError(new NullReferenceException("Throwing")); } Console.WriteLine("Invoked on threadId:{0}", Thread.CurrentThread.ManagedThreadId); Console.WriteLine("On nexting 1"); o.OnNext(1); Console.WriteLine("On nexting 2"); o.OnNext(2); Console.WriteLine("On nexting 3"); o.OnNext(3); o.OnCompleted(); Console.WriteLine("On complete"); Console.WriteLine("Finished on threadId:{0}", Thread.CurrentThread.ManagedThreadId); }); return () => { }; }); } } public static class MyRxExtensions { ///  /// An exponential back off strategy which starts with 1 second and then 4, 9, 16... ///  public static readonly Func ExponentialBackoff = n => TimeSpan.FromSeconds(Math.Pow(n, 2)); public static IObservable RetryWithBackoffStrategy( this IObservable source, int retryCount = 3, Func strategy = null, Func retryOnError = null, IScheduler scheduler = null) { strategy = strategy ?? MyRxExtensions.ExponentialBackoff; int attempt = 0; return Observable.Defer(() => { return ((++attempt == 1) ? source : source.DelaySubscription(strategy(attempt - 1), scheduler)) .Select(item => new Tuple(true, item, null)) .Catch, Exception>(e => retryOnError(e) ? Observable.Throw>(e) : Observable.Return(new Tuple(false, default(T), e))); }) .Retry(retryCount) .SelectMany(t => t.Item1 ? Observable.Return(t.Item2) : Observable.Throw(t.Item3)); } public static IObservable DelaySubscription(this IObservable source, TimeSpan delay, IScheduler scheduler = null) { if (scheduler == null) { return Observable.Timer(delay).SelectMany(_ => source); } return Observable.Timer(delay, scheduler).SelectMany(_ => source); } } 

这产生了所需的输出

 ON ERROR NullReferenceException Invoked on threadId:11 On nexting 1 On nexting 2 On nexting 3 On complete Finished on threadId:11 Invoked on threadId:11 On nexting 1 subscriber value is 1 which was seen on threadId:11 On nexting 2 subscriber value is 2 which was seen on threadId:11 On nexting 3 subscriber value is 3 which was seen on threadId:11 On complete Finished on threadId:11