强制EventProcessorHost将失败的Azure Event Hub eventData重新传递给IEventProcessor.ProcessEvents方法

该应用程序使用.NET 4.6.1和Microsoft.Azure.ServiceBus.EventProcessorHost nuget包v2.0.2及其依赖WindowsAzure.ServiceBus包v3.0.1来处理Azure Event Hub消息。

该应用程序具有IEventProcessor的实现。 当从ProcessEventsAsync方法抛出未处理的exception时, EventProcessorHost永远不会将这些消息重新发送到正在运行的IEventProcessor实例。 (有趣的是,如果托管应用程序停止并重新启动,或者租约丢失并重新获得,它将重新发送。)

有没有办法强制导致exception的事件消息被EventProcessorHost重新发送到IEventProcessor实现?

本评论中提供了一个可能的解决方案,该问题几乎完全相同: IEventProcessor.ProcessEventsAsync中的Redeliver未处理的EventHub消息

评论建议在ProcessEventsAsync发生exception时,保留最后一个成功处理的事件消息的副本,并使用该消息显式检查点。 但是,在实现和测试此类解决方案后, EventProcessorHost仍然不会重新发送。 实现非常简单:

 private EventData _lastSuccessfulEvent; public async Task ProcessEventsAsync( PartitionContext context, IEnumerable messages) { try { await ProcessEvents(context, messages); // does actual processing, may throw exception _lastSuccessfulEvent = messages .OrderByDescending(ed => ed.SequenceNumber) .First(); } catch(Exception ex) { await context.CheckpointAsync(_lastSuccessfulEvent); } } 

对行动中的事情的分析: 在此处输入图像描述

部分日志示例可在此处获取: https : //gist.github.com/ttbjj/4781aa992941e00e4e15e0bf1c45f316#file-gistfile1-txt

TLDR :将失败的一批事件重新播放到 IEventProcessor.ProcessEventsAsync的唯一可靠方法是 – 立即Shutdown EventProcessorHost (又名EPH – 使用eph.UnregisterEventProcessorAsync()或终止进程 – 根据情况。 这将允许其他EPH实例获取此分区的租约并从上一个检查点开始。

在解释之前 – 我想说出来,这是一个很好的问题 ,实际上,这是我们为EPH做出的最艰难的设计选择之一。 在我看来,这是一个权衡b / w: EPH框架的usability / supportability ,而不是Technical-Correctness

理想情况应该是:当IEventProcessorImpl.ProcessEventsAsync的用户代码抛出exception时 – EPH库不应该捕获这个。 它应该让这个Exception – 崩溃进程和crash-dump清楚地显示callstack负责。 我仍然相信 – 这是technically-correcttechnically-correct解决方案。

当前情况IEventProcessorImpl.ProcessEventsAsync API和EPH的合同是,

  1. 只要可以从EventHubs服务接收EventData – 继续使用EventData's调用用户回调( IEventProcessorImplementation.ProcessEventsAsync )&如果用户回调在调用时抛出错误,则通知EventProcessorOptions.ExceptionReceived
  2. IEventProcessorImpl.ProcessEventsAsync用户代码应该处理所有错误并根据需要包含Retry'sEPH不会在此回叫中设置任何超时,以便用户完全控制处理时间。
  3. 如果特定事件是导致问题的原因 – 使用特殊属性标记EventData – 例如:type = poison-event并重新发送到同一EventHub (包括指向实际事件的指针,复制这些EventData.OffsetSequenceNumber进入新的EventData.ApplicationProperties )或将其转发到SERVICEBUS队列或将其存储在其他地方,基本上, 识别和推迟处理毒物事件
  4. 如果你处理了所有可能的情况并且仍在运行Exceptions – catch’em并关闭EPH或者使用此exception使进程失败。 当EPH重新启动时 – 它将从左侧开始。

为什么检查指向“旧事件”不起作用 (阅读此内容以了解EPH ):

在幕后, EPH正在为每个EventHub Consumergroup分区的接收器运行一个泵 – 它的工作是从给定的checkpoint (如果存在)启动接收器并创建IEventProcessor实现的专用实例,然后从指定的Offset receive指定的EventHub分区在检查点(如果不存在 – EventProcessorOptions.initialOffsetProvider )并最终调用IEventProcessorImpl.ProcessEventsAsync 。 当EPH进程Shutsdown和Partition的所有权被移动到另一个EPH实例时, Checkpoint的目的是能够可靠地开始处理消息。 因此, checkpoint仅在启动PUMP时消耗,并且一旦泵启动就不会被读取。

正如我写的那样, EPH的版本是2.2.10 ……

简单回答:您是否尝试过EventProcessorHost.ResetConnection(string partiotionId) ?

复杂的答案:这可能是一个需要解决的架构问题,为什么处理失败? 这是暂时的错误吗? 正在重试处理逻辑是一种可能的情况? 等等…