强制EventProcessorHost将失败的Azure Event Hub eventData重新传递给IEventProcessor.ProcessEvents方法
该应用程序使用.NET 4.6.1和Microsoft.Azure.ServiceBus.EventProcessorHost nuget包v2.0.2及其依赖WindowsAzure.ServiceBus包v3.0.1来处理Azure Event Hub消息。
该应用程序具有IEventProcessor
的实现。 当从ProcessEventsAsync
方法抛出未处理的exception时, EventProcessorHost
永远不会将这些消息重新发送到正在运行的IEventProcessor
实例。 (有趣的是,如果托管应用程序停止并重新启动,或者租约丢失并重新获得,它将重新发送。)
有没有办法强制导致exception的事件消息被EventProcessorHost
重新发送到IEventProcessor
实现?
本评论中提供了一个可能的解决方案,该问题几乎完全相同: IEventProcessor.ProcessEventsAsync中的Redeliver未处理的EventHub消息
评论建议在ProcessEventsAsync
发生exception时,保留最后一个成功处理的事件消息的副本,并使用该消息显式检查点。 但是,在实现和测试此类解决方案后, EventProcessorHost
仍然不会重新发送。 实现非常简单:
private EventData _lastSuccessfulEvent; public async Task ProcessEventsAsync( PartitionContext context, IEnumerable messages) { try { await ProcessEvents(context, messages); // does actual processing, may throw exception _lastSuccessfulEvent = messages .OrderByDescending(ed => ed.SequenceNumber) .First(); } catch(Exception ex) { await context.CheckpointAsync(_lastSuccessfulEvent); } }
对行动中的事情的分析:
部分日志示例可在此处获取: https : //gist.github.com/ttbjj/4781aa992941e00e4e15e0bf1c45f316#file-gistfile1-txt
TLDR :将失败的一批事件重新播放到 IEventProcessor.ProcessEventsAsync的唯一可靠方法是 – 立即Shutdown
EventProcessorHost
(又名EPH
) – 使用eph.UnregisterEventProcessorAsync()
或终止进程 – 根据情况。 这将允许其他EPH
实例获取此分区的租约并从上一个检查点开始。
在解释之前 – 我想说出来,这是一个很好的问题 ,实际上,这是我们为EPH
做出的最艰难的设计选择之一。 在我看来,这是一个权衡b / w: EPH
框架的usability
/ supportability
,而不是Technical-Correctness
。
理想情况应该是:当IEventProcessorImpl.ProcessEventsAsync
的用户代码抛出exception时 – EPH
库不应该捕获这个。 它应该让这个Exception
– 崩溃进程和crash-dump
清楚地显示callstack
负责。 我仍然相信 – 这是technically-correct
最technically-correct
解决方案。
当前情况 : IEventProcessorImpl.ProcessEventsAsync
API和EPH
的合同是,
- 只要可以从EventHubs服务接收
EventData
– 继续使用EventData's
调用用户回调(IEventProcessorImplementation.ProcessEventsAsync
)&如果用户回调在调用时抛出错误,则通知EventProcessorOptions.ExceptionReceived
。 -
IEventProcessorImpl.ProcessEventsAsync
用户代码应该处理所有错误并根据需要包含Retry's
。EPH
不会在此回叫中设置任何超时,以便用户完全控制处理时间。 - 如果特定事件是导致问题的原因 – 使用特殊属性标记
EventData
– 例如:type =poison-event
并重新发送到同一EventHub
(包括指向实际事件的指针,复制这些EventData.Offset
和SequenceNumber
进入新的EventData.ApplicationProperties
)或将其转发到SERVICEBUS队列或将其存储在其他地方,基本上, 识别和推迟处理毒物事件 。 - 如果你处理了所有可能的情况并且仍在运行
Exceptions
– catch’em并关闭EPH
或者使用此exception使进程失败。 当EPH
重新启动时 – 它将从左侧开始。
为什么检查指向“旧事件”不起作用 (阅读此内容以了解EPH
):
在幕后, EPH
正在为每个EventHub Consumergroup分区的接收器运行一个泵 – 它的工作是从给定的checkpoint
(如果存在)启动接收器并创建IEventProcessor
实现的专用实例,然后从指定的Offset
receive
指定的EventHub分区在检查点(如果不存在 – EventProcessorOptions.initialOffsetProvider
)并最终调用IEventProcessorImpl.ProcessEventsAsync
。 当EPH
进程Shutsdown和Partition的所有权被移动到另一个EPH
实例时, Checkpoint
的目的是能够可靠地开始处理消息。 因此, checkpoint
仅在启动PUMP时消耗,并且一旦泵启动就不会被读取。
正如我写的那样, EPH
的版本是2.2.10 ……
简单回答:您是否尝试过EventProcessorHost.ResetConnection(string partiotionId) ?
复杂的答案:这可能是一个需要解决的架构问题,为什么处理失败? 这是暂时的错误吗? 正在重试处理逻辑是一种可能的情况? 等等…