react native扩展和重试

所以今天早上我的雷达上出现了一系列文章。 它从这个问题开始,这导致了 GitHub上的原始示例和源代码 。

我稍微重写了一遍,所以我可以在控制台和服务应用程序中开始使用它:

public static class Extensions { static readonly TaskPoolScheduler Scheduler = new TaskPoolScheduler(new TaskFactory()); // Licensed under the MIT license with <3 by GitHub ///  /// An exponential back off strategy which starts with 1 second and then 4, 8, 16... ///  [SuppressMessage("Microsoft.Security", "CA2104:DoNotDeclareReadOnlyMutableReferenceTypes")] public static readonly Func ExponentialBackoff = n => TimeSpan.FromSeconds(Math.Pow(n, 2)); ///  /// A linear strategy which starts with 1 second and then 2, 3, 4... ///  [SuppressMessage("Microsoft.Security", "CA2104:DoNotDeclareReadOnlyMutableReferenceTypes")] public static readonly Func LinearStrategy = n => TimeSpan.FromSeconds(1*n); ///  /// Returns a cold observable which retries (re-subscribes to) the source observable on error up to the /// specified number of times or until it successfully terminates. Allows for customizable back off strategy. ///  /// The source observable. /// The number of attempts of running the source observable before failing. /// The strategy to use in backing off, exponential by default. /// A predicate determining for which exceptions to retry. Defaults to all /// The scheduler. ///  /// A cold observable which retries (re-subscribes to) the source observable on error up to the /// specified number of times or until it successfully terminates. ///  [SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects before losing scope")] public static IObservable RetryWithBackoffStrategy( this IObservable source, int retryCount = 3, Func strategy = null, Func retryOnError = null, IScheduler scheduler = null) { strategy = strategy ?? ExponentialBackoff; scheduler = scheduler ?? Scheduler; if (retryOnError == null) retryOnError = e => true; int attempt = 0; return Observable.Defer(() => { return ((++attempt == 1) ? source : source.DelaySubscription(strategy(attempt - 1), scheduler)) .Select(item => new Tuple(true, item, null)) .Catch<Tuple, Exception>(e => retryOnError(e) ? Observable.Throw<Tuple>(e) : Observable.Return(new Tuple(false, default(T), e))); }) .Retry(retryCount) .SelectMany(t => t.Item1 ? Observable.Return(t.Item2) : Observable.Throw(t.Item3)); } } 

现在来测试它是如何工作的,我写了这个小程序:

 class Program { static void Main(string[] args) { int tryCount = 0; var cts = new CancellationTokenSource(); var sched = new TaskPoolScheduler(new TaskFactory()); var source = Observable.Defer( () => { Console.WriteLine("Action {0}", tryCount); var a = 5/tryCount++; return Observable.Return("yolo"); }); source.RetryWithBackoffStrategy(scheduler: sched, strategy: Extensions.LinearStrategy, retryOnError: exception => exception is DivideByZeroException); while (!cts.IsCancellationRequested) source.Subscribe( res => { Console.WriteLine("Result: {0}", res); }, ex => { Console.WriteLine("Error: {0}", ex.Message); }, () => { cts.Cancel(); Console.WriteLine("End Processing after {0} attempts", tryCount); }); } } 

最初我曾想过,订阅事件会自动触发所有后续退休。 事实并非如此,所以我不得不实施一个取消令牌并循环,直到它发出所有苦差事已经用尽的信号。

另一种选择是使用AutoResetEvent:

 class Program { static void Main(string[] args) { int tryCount = 0; var auto = new AutoResetEvent(false); var source = Observable.Defer( () => { Console.WriteLine("Action {0}", tryCount); var a = 5/tryCount++; return Observable.Return("yolo"); }); source.RetryWithBackoffStrategy(strategy: Extensions.LinearStrategy, retryOnError: exception => exception is DivideByZeroException); while (!auto.WaitOne(1)) { source.Subscribe( res => { Console.WriteLine("Result: {0}", res); }, ex => { Console.WriteLine("Error: {0}", ex.Message); }, () => { Console.WriteLine("End Processing after {0} attempts", tryCount); auto.Set(); }); } } } 

在这两种情况下,它都会显示以下行:

 Action 0 Error: Attempted to divide by zero. Action 1 Result: yolo End Processing after 2 attempts 

我对这群人的问题是:这是使用此扩展程序的最佳方式吗? 或者有没有办法订阅Observable,以便重新启动自己,直到重试次数?

最终更新

根据Brandon的建议,这是订阅的正确方式:

 internal class Program { #region Methods private static void Main(string[] args) { int tryCount = 0; IObservable source = Observable.Defer( () => { Console.WriteLine("Action {0}", tryCount); int a = 5 / tryCount++; return Observable.Return("yolo"); }); source.RetryWithBackoffStrategy(strategy: Extensions.ExponentialBackoff, retryOnError: exception => exception is DivideByZeroException, scheduler: Scheduler.Immediate) .Subscribe( res => { Console.WriteLine("Result: {0}", res); }, ex => { Console.WriteLine("Error: {0}", ex.Message); }, () => { Console.WriteLine("End Processing after {0} attempts", tryCount); }); } #endregion } 

输出会略有不同:

 Action 0 Action 1 Result: yolo End Processing after 2 attempts 

结果certificate这是非常有用的扩展。 下面是另一个如何使用它的示例,其中使用委托给出策略和error handling。

 internal class Program { #region Methods private static void Main(string[] args) { int tryCount = 0; IObservable source = Observable.Defer( () => { Console.WriteLine("Action {0}", tryCount); int a = 5 / tryCount++; return Observable.Return("yolo"); }); source.RetryWithBackoffStrategy( strategy: i => TimeSpan.FromMilliseconds(1), retryOnError: exception => { if (exception is DivideByZeroException) { Console.WriteLine("Tried to divide by zero"); return true; } return false; }, scheduler: Scheduler.Immediate).Subscribe( res => { Console.WriteLine("Result: {0}", res); }, ex => { Console.WriteLine("Error: {0}", ex.Message); }, () => { Console.WriteLine("Succeeded after {0} attempts", tryCount); }); } #endregion } 

输出:

 Action 0 Tried to divide by zero Action 1 Result: yolo Succeeded after 2 attempts 

是的Rx通常是异步的,因此在编写测试时,您需要等待它完成(否则Main只是在您调用Subscribe后立即退出)。

此外,请确保通过调用source.RetryWithBackoffStrategy(...)订阅生成的observable。 这产生了一个具有重试语义的 observable。

在这种情况下最简单的解决方案是使用Wait

 try { var source2 = source.RetryWithBackoffStrategy(/*...*/); // blocks the current thread until the source finishes var result = source2.Wait(); Console.WriteLine("result=" + result); } catch (Exception err) { Console.WriteLine("uh oh", err); } 

如果您使用NUnit(支持异步测试)之类的东西来编写测试,那么您可以:

 [Test] public async Task MyTest() { var source = // ...; var source2 = source.RetryWithBackoffStrategy(/*...*/); var result = await source2; // you can await observables Assert.That(result, Is.EqualTo(5)); }