在exception时重新排队消息

我正在寻找一种可靠的方法来重新排队无法正确处理的消息 – 目前。

我一直在关注http://dotnetcodr.com/2014/06/16/rabbitmq-in-net-c-basic-error-handling-in-receiver/ ,似乎支持在RabbitMQ中重新排队消息API。

else //reject the message but push back to queue for later re-try { Console.WriteLine("Rejecting message and putting it back to the queue: {0}", message); model.BasicReject(deliveryArguments.DeliveryTag, true); } 

但是我正在使用EasyNetQ。 所以想知道我将如何做类似的事情。

 bus.Subscribe("my_subscription_id", msg => { try { // do work... could be long running } catch () { // something went wrong - requeue message } }); 

这甚至是一个好方法吗? 如果do work超过RabbitMQ服务器等待ACK超时, do work确认消息可能导致问题。

所以我提出了这个解决方案。 这取代了EasyNetQ的默认错误策略。

 public class DeadLetterStrategy : DefaultConsumerErrorStrategy { public DeadLetterStrategy(IConnectionFactory connectionFactory, ISerializer serializer, IEasyNetQLogger logger, IConventions conventions, ITypeNameSerializer typeNameSerializer) : base(connectionFactory, serializer, logger, conventions, typeNameSerializer) { } public override AckStrategy HandleConsumerError(ConsumerExecutionContext context, Exception exception) { object deathHeaderObject; if (!context.Properties.Headers.TryGetValue("x-death", out deathHeaderObject)) return AckStrategies.NackWithoutRequeue; var deathHeaders = deathHeaderObject as IList; if (deathHeaders == null) return AckStrategies.NackWithoutRequeue; var retries = 0; foreach (IDictionary header in deathHeaders) { var count = int.Parse(header["count"].ToString()); retries += count; } if (retries < 3) return AckStrategies.NackWithoutRequeue; return base.HandleConsumerError(context, exception); } } 

你这样替换它:

 RabbitHutch.CreateBus("host=localhost", serviceRegister => serviceRegister.Register()) 

您必须使用AdvancedBus因此您必须手动设置所有内容。

 using (var bus = RabbitHutch.CreateBus("host=localhost", serviceRegister => serviceRegister.Register())) { var deadExchange = bus.Advanced.ExchangeDeclare("exchange.text.dead", ExchangeType.Direct); var textExchange = bus.Advanced.ExchangeDeclare("exchange.text", ExchangeType.Direct); var queue = bus.Advanced.QueueDeclare("queue.text", deadLetterExchange: deadExchange.Name); bus.Advanced.Bind(deadExchange, queue, ""); bus.Advanced.Bind(textExchange, queue, ""); bus.Advanced.Consume(queue, (message, info) => HandleTextMessage(message, info)); } 

这将死信3次失败。 之后,它将转到EasyNetQ提供的默认错误队列以进行error handling。 您可以订阅该队列。

当exception传播出您的使用者方法时,消息将被删除。 所以这会引发一封死信。

 static void HandleTextMessage(IMessage textMessage, MessageReceivedInfo info) { throw new Exception("This is a test!"); } 

据我所知,没有办法用EasyNetQ手动ackrejectreject消息。

我看到你已经与EasyNetQ团队开了一张发票 ,关于这个…但是没有答案。

FWIW,这是一件非常合适的事情。 我使用的所有库都支持此function集(在NodeJS中)并且很常见。 我很惊讶EasyNetQ不支持这个。