基于multithreading的RabbitMQ消费者

我们有一个监听单个RabbitMQ队列的Windows服务并处理该消息。

我们希望扩展相同的Windows服务,以便它可以侦听RabbitMQ的多个队列并处理消息。

不确定是否可以通过使用multithreading来实现,因为每个线程都必须列出(阻塞)队列。

由于我对multithreading很陌生,需要关注以下几点的高级指南,这将有助于我开始构建原型。

  1. 是否可以通过使用线程在单个应用程序中侦听多个队列?
  2. 如何处理如果任何单个线程被播出的情况(由于exception等),如何在不重启整个Windows服务的情况下返回。
  3. 任何设计模式或开源实现可以帮助我处理这种情况。

我喜欢你写你的问题的方式 – 它开始非常广泛,专注于具体细节。 我已经成功地实现了一些非常相似的东西,目前正在开发一个开源项目,以吸取我的经验并将它们带回社区。 不幸的是,虽然 – 我还没有整齐地打包我的代码,这对你没有多大帮助! 无论如何,回答你的问题:

1. Is it possible to use threading for multiple queues.

答:是的,但它可能充满了陷阱。 也就是说,RabbitMQ .NET库并不是最好的代码片段,我发现它是AMQP协议的一个相对麻烦的实现。 其中一个最有害的警告是它如何处理“接收”或“消费”行为,如果你不小心,这会很容易导致死锁。 幸运的是,它在API文档中得到了很好的说明。 建议 – 如果可以,请使用单例连接对象。 然后,在每个线程中,使用连接创建新的IModel和相应的使用者。

2. How to gracefully handle exceptions in threads – 我相信这是另一个主题,我不会在这里解决它,因为有几种方法可以使用。

3. Any open-source projects? – 我喜欢EasyNetQ背后的想法 ,尽管我最后还是自己动手了。 我希望记得在我的开源项目完成后回过头来看,因为我认为它比EasyNetQ更好。

您可能会发现此答案非常有用。 我对RabbitMQ的工作方式有一个非常基本的了解,但我可能会继续每个通道每个通道一个用户 ,正如那里所建议的那样。

为此组织线程模型肯定有多个选项。 实际实现将取决于您需要如何处理来自多个队列的消息:并行,或通过聚合它们并序列化处理。 以下代码是一个控制台应用程序,它实现了后一种情况的模拟。 它使用Task Parallel Library和BlockingCollection类(这对于这种任务非常方便)。

 using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; namespace Console_21842880 { class Program { BlockingCollection _commonQueue; // process an individual queue void ProcessQueue(int id, BlockingCollection queue, CancellationToken token) { while (true) { // observe cancellation token.ThrowIfCancellationRequested(); // get a message, this blocks and waits var message = queue.Take(token); // process this message // just place it to the common queue var wrapperMessage = "queue " + id + ", message: " + message; _commonQueue.Add(wrapperMessage); } } // process the common aggregated queue void ProcessCommonQeueue(CancellationToken token) { while (true) { // observe cancellation token.ThrowIfCancellationRequested(); // this blocks and waits // get a message, this blocks and waits var message = _commonQueue.Take(token); // process this message Console.WriteLine(message.ToString()); } } // run the whole process async Task RunAsync(CancellationToken token) { var queues = new List>(); _commonQueue = new BlockingCollection(); // start individual queue processors var tasks = Enumerable.Range(0, 4).Select((i) => { var queue = new BlockingCollection(); queues.Add(queue); return Task.Factory.StartNew( () => ProcessQeueue(i, queue, token), TaskCreationOptions.LongRunning); }).ToList(); // start the common queue processor tasks.Add(Task.Factory.StartNew( () => ProcessCommonQeueue(token), TaskCreationOptions.LongRunning)); // start the simulators tasks.AddRange(Enumerable.Range(0, 4).Select((i) => SimulateMessagesAsync(queues, token))); // wait for all started tasks to complete await Task.WhenAll(tasks); } // simulate a message source async Task SimulateMessagesAsync(List> queues, CancellationToken token) { var random = new Random(Environment.TickCount); while (true) { token.ThrowIfCancellationRequested(); await Task.Delay(random.Next(100, 1000)); var queue = queues[random.Next(0, queues.Count)]; var message = Guid.NewGuid().ToString() + " " + DateTime.Now.ToString(); queue.Add(message); } } // entry point static void Main(string[] args) { Console.WriteLine("Ctrl+C to stop..."); var cts = new CancellationTokenSource(); Console.CancelKeyPress += (s, e) => { // cancel upon Ctrl+C e.Cancel = true; cts.Cancel(); }; try { new Program().RunAsync(cts.Token).Wait(); } catch (Exception ex) { if (ex is AggregateException) ex = ex.InnerException; Console.WriteLine(ex.Message); } Console.WriteLine("Press Enter to exit"); Console.ReadLine(); } } } 

另一个想法可能是使用Reactive Extensions(Rx) 。 如果您可以将到达的消息视为事件,则Rx可以帮助将它们聚合为单个流。