Observable.Retry无法按预期工作

我有一系列使用异步方法处理的数字 。 我正在模拟可能失败远程服务调用 。 如果失败,我想重试,直到电话成功。

问题是,对于我正在尝试的代码, 每次在异步方法中抛出exception时,序列似乎永远都会挂起

你可以用这个简单的代码片段测试它(在LINQPad中测试)

Random rnd = new Random(); void Main() { var numbers = Enumerable.Range(1, 10).ToObservable(); var processed = numbers.SelectMany(n => Process(n).ToObservable().Retry()); processed.Subscribe( f => Console.WriteLine(f)); } public async Task Process(int n) { if (rnd.Next(2) == 1) { throw new InvalidOperationException(); } await Task.Delay(2000); return n*10; } 

它应该处理每个元素,重试那些失败的元素。 相反,它永远不会结束,我不知道为什么。

我怎样才能做到我想要的呢?

编辑:(感谢@CharlesNRice和@JonSkeet的线索!):

这有效!

 Random rnd = new Random(); void Main() { var numbers = Enumerable.Range(1, 10).ToObservable(); var processed = numbers.SelectMany(n => RetryTask(() => MyTask(n)).ToObservable()); processed.Subscribe(f => Console.WriteLine(f)); } private async Task MyTask(int n) { if (rnd.Next(2) == 1) { throw new InvalidOperationException(); } await System.Threading.Tasks.Task.Delay(2000); return n * 10; } async Task RetryTask(Func<Task> myTask, int? retryCount = null) { while (true) { try { return await myTask(); } catch (Exception) { Debug.WriteLine("Retrying..."); if (retryCount.HasValue) { if (retryCount == 0) { throw; } retryCount--; } } } } 

在这种情况下,滚动你自己的Retry是过度的。 只需将方法调用包装在Defer块中即可实现相同的function,并在重试时重新执行。

 var numbers = Enumerable.Range(1, 10).ToObservable(); var processed = numbers.SelectMany(n => //Defer call passed method every time it is subscribed to, //Allowing the Retry to work correctly. Observable.Defer(() => Process(n).ToObservable()).Retry() ); processed.Subscribe( f => Console.WriteLine(f)); 

您正在重试处于故障状态的同一任务。 重试将重新订阅可观察的来源。 重试的来源是ToObservable()。 它不会像任务工厂那样行动并创建一个新的任务,并且由于任务出现故障,它将继续重试故障任务并永远不会成功。

您可以查看此答案如何制作自己的重试包装器https://stackoverflow.com/a/6090049/1798889