如何使用EasyNetQ / RabbitMQ进行error handling

我正在使用带有EasyNetQ库的C#中的RabbitMQ。 我在这里使用了pub / sub模式。 我仍然有一些问题希望有人可以帮助我:

  1. 在消费消息时出现错误时,它会自动移至错误队列。 如何实现重试(以便将其放回原始队列,当它无法处理X次时,它会移动到死信队列)?
  2. 据我所知,总有1个错误队列用于转储来自所有其他队列的消息。 如何为每种类型设置1个错误队列,以便每个队列都有自己的关联错误队列?
  3. 如何轻松重试错误队列中的消息? 我试过Hosepipe,但它只是将消息重新发布到错误队列而不是原始队列。 我不喜欢这个选项,因为我不想在控制台中摆弄。 我最好只针对错误队列进行编程。

任何人?

EasyNetQ / RabbitMQ遇到的问题是,与其他消息服务(如SQS或Azure Service Bus / Queues)相比,它更加“原始”,但我会尽力指出正确的方向。

问题1。

这将是你要做的。 最简单的方法是你可以在RabbitMQ / EasyNetQ中发送No-Ack消息,它将被放置在队列的头部,供你重试。 这不是真的可取,因为它几乎会立即重试(没有时间延迟),并且还会阻止其他消息被处理(如果您有一个预取计数为1的单个订户)。

我见过使用“MessageEnvelope”的其他实现。 这是一个包装类,当消息失败时,您在MessageEnvelope上递增重试变量,并将消息重新发送回队列。 你必须这样做并在你的消息处理程序周围编写包装代码,它不是EasyNetQ的function。

使用上面的内容,我也看到人们使用信封,但允许信息被删除。 一旦它出现在死信队列中,就会有另一个应用程序/工作人员从死信队列中读取项目。

上述所有这些方法都有一个小问题,因为在处理消息时没有任何好的方法可以使对数/指数/任何类型的增加延迟。 在将消息返回队列之前,您可以在代码中“保留”一段时间,但这并不是一个好方法。

在所有这些选项中,您自己的自定义应用程序读取死信队列并决定是否根据包含重试计数的信封重新路由邮件可能是最好的方法。

问题2。

您可以使用高级API为每个队列指定死信交换。 ( https://github.com/EasyNetQ/EasyNetQ/wiki/The-Advanced-API#declaring-queues )。 但是,这意味着您必须在任何地方使用高级API,因为使用订阅/发布的简单IBus实现会查找基于消息类型和订户名称命名的队列。 使用队列的自定义声明意味着您将自己处理队列的命名,这意味着当您订阅时,您将需要知道您想要的名称等等。不再为您自动订阅!

问题3

错误队列/死信队列只是另一个队列。 您可以收听此队列并执行您需要执行的操作。 但实际上没有任何开箱即用的解决方案听起来像是符合您的需求。

我已经完全按照你的描述实现了。 以下是根据我的经验并与您的每个问题相关的一些提示。

Q1(如何重试X次):

为此,您可以使用IMessage.Body.BasicProperties.Headers 。 当您从错误队列中使用消息时,只需添加一个包含您选择的名称的标头。 在每个进入错误队列的消息中查找此标头并将其递增。 这将为您提供正在运行的重试计数。

当消息超过X的重试限制时,您有一个策略来执行该操作非常重要 。您不希望丢失该消息。 就我而言,我在那时将消息写入磁盘。 它为您提供了许多有用的调试信息,以便稍后再回来,因为EasyNetQ会自动将您的原始消息包含在错误信息中。 它还具有原始消息,以便您可以(如果您愿意)手动(或可能通过某些批处理重新处理代码自动化)以某种受控方式重新排列消息。

您可以查看Hosepipe实用程序中的代码,以查看执行此操作的好方法。 事实上,如果您按照您在那里看到的模式,那么您甚至可以在以后使用Hosepipe来重新排队消息(如果需要)。

Q2(如何为每个始发队列创建一个错误队列):

您可以使用EasyNetQ Advanced Bus干净利落地完成此操作。 使用IBus.Advanced.Container.Resolve来获取约定界面。 然后,您可以使用conventions.ErrorExchangeNamingConventionconventions.ErrorQueueNamingConvention设置错误队列命名的conventions.ErrorQueueNamingConvention 。 在我的例子中,我将约定设置为基于原始队列的名称,以便每次创建队列时都获得队列/ queue_error对队列。

Q3(如何处理错误队列中的消息):

您可以像执行任何其他队列一样为错误队列声明使用者。 同样,AdvancedBus允许您通过指定队列中的类型是EasyNetQ.SystemMessage.Error来干净地执行此EasyNetQ.SystemMessage.Error 。 因此, IAdvancedBus.Consume()将帮助您。 重试只是意味着重新发布到原始交换(注意你在头文件中放置的重试次数(参见上面我对Q1的回答),并且你从错误队列消耗的错误消息中的信息可以帮助你找到目标重新发布。