在ITargetBlock 中重试策略

我需要在工作流程中引入重试策略。 假设有3个块以这种方式连接:

var executionOptions = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 3 }; var buffer = new BufferBlock(); var processing = new TransformBlock(..., executionOptions); var send = new ActionBlock(...); buffer.LinkTo(processing); processing.LinkTo(send); 

因此,有一个缓冲区累积数据,然后将其发送到变换块,该变换块一次不处理3个项目,然后将结果发送到动作块。

在处理过程中可能会出现变换块瞬态错误,如果错误是瞬态错误,我想重试该块。

我知道块通常不可重试(传递到块中的委托可以被重试)。 其中一个选项是包装传递给支持重试的委托。

我也知道有一个非常好的库TransientFaultHandling.Core ,它为瞬态故障提供重试机制。 这是一个很棒的图书馆,但不是我的情况。 如果我将传递给转换块的委托包装到RetryPolicy.ExecuteAsync方法中,则转换块内的消息将被锁定,并且在重试完成或失败之前,转换块将无法接收新消息。 想象一下,如果所有3条消息都输入到重试中(假设下一次重试尝试将在2分钟内完成)并且失败,则变换块将被卡住,直到至少有一条消息离开变换块。

我看到的唯一解决方案是扩展ITargetBlock (实际上, ITargetBlock也足够了),并手动重试(如此处所示):

 do { try { return await transform(input); } catch { if( numRetries  processing.Post(message)); } } while( numRetries-- > 0 ); 

ig将消息再次放入变换块中,但是在这种情况下,重试上下文(剩余的重试次数等)也应该传递给该块。 听起来太复杂了……

有没有人看到更简单的方法来实现工作流程块的重试策略?

我认为你几乎必须这样做,你必须跟踪一条消息的剩余重试次数,你必须以某种方式安排重试尝试。

但是你可以通过将其封装在一个单独的方法中来使这更好。 就像是:

 // it's a private class, so public fields are okay private class RetryingMessage { public T Data; public int RetriesRemaining; public readonly List Exceptions = new List(); } public static IPropagatorBlock CreateRetryingBlock( Func> transform, int numberOfRetries, TimeSpan retryDelay, Action> failureHandler) { var source = new TransformBlock>( input => new RetryingMessage { Data = input, RetriesRemaining = numberOfRetries }); // TransformManyBlock, so that we can propagate zero results on failure TransformManyBlock, TOutput> target = null; target = new TransformManyBlock, TOutput>( async message => { try { return new[] { await transform(message.Data) }; } catch (Exception ex) { message.Exceptions.Add(ex); if (message.RetriesRemaining == 0) { failureHandler(message.Exceptions); } else { message.RetriesRemaining--; Task.Delay(retryDelay) .ContinueWith(_ => target.Post(message)); } return null; } }); source.LinkTo( target, new DataflowLinkOptions { PropagateCompletion = true }); return DataflowBlock.Encapsulate(source, target); } 

我已经添加了代码来跟踪exception,因为我认为不应该忽略失败,它们应该至少被记录。

此外,此代码在完成时不能很好地工作:如果有重试等待它们的延迟并且您Complete()块,它将立即完成并且重试将丢失。 如果这对您来说是个问题,那么当source完成并且没有重试等待时,您将必须跟踪未完成的reties并完成target

除了svick的出色答案,还有其他几个选择:

  1. 您可以使用TransientFaultHandling.Core – 只需将MaxDegreeOfParallelism设置为Unbounded以便其他消息可以通过。
  2. 您可以修改块输出类型以包括失败指示和重试计数,并创建数据流循环,将filter传递给LinkTo ,以检查是否需要进行其他重试。 这种方法更复杂; 如果正在进行重试,则必须向块添加延迟,并添加TransformBlock以删除网格其余部分的失败/重试信息。