从多个队列中读取,RabbitMQ

我是RabbitMQ的新手。 我希望能够在有多个队列(要读取)的情况下处理读取消息而不会阻塞。 有关如何做到这一点的任何意见?

//编辑1

public class Rabbit : IMessageBus { private List publishQ = new List(); private List subscribeQ = new List(); ConnectionFactory factory = null; IConnection connection = null; IModel channel = null; Subscription sub = null; public void writeMessage( Measurement m1 ) { byte[] body = Measurement.AltSerialize( m1 ); int msgCount = 1; Console.WriteLine("Sending message to queue {1} via the amq.direct exchange.", m1.id); string finalQueue = publishToQueue( m1.id ); while (msgCount --> 0) { channel.BasicPublish("amq.direct", finalQueue, null, body); } Console.WriteLine("Done. Wrote the message to queue {0}.\n", m1.id); } public string publishToQueue(string firstQueueName) { Console.WriteLine("Creating a queue and binding it to amq.direct"); string queueName = channel.QueueDeclare(firstQueueName, true, false, false, null); channel.QueueBind(queueName, "amq.direct", queueName, null); Console.WriteLine("Done. Created queue {0} and bound it to amq.direct.\n", queueName); return queueName; } public Measurement readMessage() { Console.WriteLine("Receiving message..."); Measurement m = new Measurement(); int i = 0; foreach (BasicDeliverEventArgs ev in sub) { m = Measurement.AltDeSerialize(ev.Body); //m.id = //get the id here, from sub if (++i == 1) break; sub.Ack(); } Console.WriteLine("Done.\n"); return m; } public void subscribeToQueue(string queueName ) { sub = new Subscription(channel, queueName); } public static string MsgSysName; public string MsgSys { get { return MsgSysName; } set { MsgSysName = value; } } public Rabbit(string _msgSys) //Constructor { factory = new ConnectionFactory(); factory.HostName = "localhost"; connection = factory.CreateConnection(); channel = connection.CreateModel(); //consumer = new QueueingBasicConsumer(channel); System.Console.WriteLine("\nMsgSys: RabbitMQ"); MsgSys = _msgSys; } ~Rabbit() { //observer?? connection.Dispose(); //channel.Dispose(); System.Console.WriteLine("\nDestroying RABBIT"); } } 

//编辑2

 private List subscriptions = new List(); Subscription sub = null; public Measurement readMessage() { Measurement m = new Measurement(); foreach(Subscription element in subscriptions) { foreach (BasicDeliverEventArgs ev in element) { //ev = element.Next(); if( ev != null) { m = Measurement.AltDeSerialize( ev.Body ); return m; } m = null; } } System.Console.WriteLine("No message in the queue(s) at this time."); return m; } public void subscribeToQueue(string queueName) { sub = new Subscription(channel, queueName); subscriptions.Add(sub); } 

//编辑3

 //MessageHandler.cs public class MessageHandler { // Implementation of methods for Rabbit class go here private List publishQ = new List(); private List subscribeQ = new List(); ConnectionFactory factory = null; IConnection connection = null; IModel channel = null; QueueingBasicConsumer consumer = null; private List subscriptions = new List(); Subscription sub = null; public void writeMessage ( Measurement m1 ) { byte[] body = Measurement.AltSerialize( m1 ); //declare a queue if it doesn't exist publishToQueue(m1.id); channel.BasicPublish("amq.direct", m1.id, null, body); Console.WriteLine("\n [x] Sent to queue {0}.", m1.id); } public void publishToQueue(string queueName) { string finalQueueName = channel.QueueDeclare(queueName, true, false, false, null); channel.QueueBind(finalQueueName, "amq.direct", "", null); } public Measurement readMessage() { Measurement m = new Measurement(); foreach(Subscription element in subscriptions) { if( element.QueueName == null) { m = null; } else { BasicDeliverEventArgs ev = element.Next(); if( ev != null) { m = Measurement.AltDeSerialize( ev.Body ); m.id = element.QueueName; element.Ack(); return m; } m = null; } element.Ack(); } System.Console.WriteLine("No message in the queue(s) at this time."); return m; } public void subscribeToQueue(string queueName) { sub = new Subscription(channel, queueName); subscriptions.Add(sub); } public static string MsgSysName; public string MsgSys { get { return MsgSysName; } set { MsgSysName = value; } } public MessageHandler(string _msgSys) //Constructor { factory = new ConnectionFactory(); factory.HostName = "localhost"; connection = factory.CreateConnection(); channel = connection.CreateModel(); consumer = new QueueingBasicConsumer(channel); System.Console.WriteLine("\nMsgSys: RabbitMQ"); MsgSys = _msgSys; } public void disposeAll() { connection.Dispose(); channel.Dispose(); foreach(Subscription element in subscriptions) { element.Close(); } System.Console.WriteLine("\nDestroying RABBIT"); } } 

//App1.cs

 using System; using System.IO; using UtilityMeasurement; using UtilityMessageBus; public class MainClass { public static void Main() { MessageHandler obj1 = MessageHandler("Rabbit"); System.Console.WriteLine("\nA {0} object is now created.", MsgSysName); //Create new Measurement messages Measurement m1 = new Measurement("q1", 2345, 23.456); Measurement m2 = new Measurement("q2", 222, 33.33); System.Console.WriteLine("Test message 1:\n ID: {0}", m1.id); System.Console.WriteLine(" Time: {0}", m1.time); System.Console.WriteLine(" Value: {0}", m1.value); System.Console.WriteLine("Test message 2:\n ID: {0}", m2.id); System.Console.WriteLine(" Time: {0}", m2.time); System.Console.WriteLine(" Value: {0}", m2.value); // Ask queue name and store it System.Console.WriteLine("\nName of queue to publish to: "); string queueName = (System.Console.ReadLine()).ToString(); obj1.publishToQueue( queueName ); // Write message to the queue obj1.writeMessage( m1 ); System.Console.WriteLine("\nName of queue to publish to: "); string queueName2 = (System.Console.ReadLine()).ToString(); obj1.publishToQueue( queueName2 ); obj1.writeMessage( m2 ); obj1.disposeAll(); } } 

//App2.cs

 using System; using System.IO; using UtilityMeasurement; using UtilityMessageBus; public class MainClass { public static void Main() { //Asks for the message system System.Console.WriteLine("\nEnter name of messageing system: "); System.Console.WriteLine("Usage: [Rabbit] [Zmq]"); string MsgSysName = (System.Console.ReadLine()).ToString(); //Declare an IMessageBus instance: //Here, an object of the corresponding Message System // (ex. Rabbit, Zmq, etc) is instantiated IMessageBus obj1 = MessageBusFactory.GetMessageBus(MsgSysName); System.Console.WriteLine("\nA {0} object is now created.", MsgSysName); //Create a new Measurement object m Measurement m = new Measurement(); System.Console.WriteLine("Queue name to subscribe to: "); string QueueName1 = (System.Console.ReadLine()).ToString(); obj1.subscribeToQueue( QueueName1 ); //Read message into m m = obj1.readMessage(); if (m != null ) { System.Console.WriteLine("\nMessage received from queue {0}:\n ID: {1}", m.id, m.id); System.Console.WriteLine(" Time: {0}", m.time); System.Console.WriteLine(" Value: {0}", m.value); } System.Console.WriteLine("Another queue name to subscribe to: "); string QueueName2 = (System.Console.ReadLine()).ToString(); obj1.subscribeToQueue( QueueName2 ); m = obj1.readMessage(); if (m != null ) { System.Console.WriteLine("\nMessage received from queue {0}:\n ID: {1}", m.id, m.id); System.Console.WriteLine(" Time: {0}", m.time); System.Console.WriteLine(" Value: {0}", m.value); } obj1.disposeAll(); } } 

两个信息来源:

  1. http://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss

  2. 你应该首先尝试理解这些例子。

    • %Program Files%\ RabbitMQ \ DotNetClient \ examples \ src(基本示例)

    • 从他们的Mercurial存储库(c#项目)中获取完整的工作示例。

有用的操作来理解:

  • 声明/断言/收听/订阅/发布

回复:你的问题 – 没有理由你不能拥有多个听众。 或者您可以在“交换”上使用一个侦听器订阅n个路由路径。

** re:非阻塞**

典型的监听器一次消耗一条消息。 您可以将它们从队列中拉出,或者它们将以“窗口”方式(通过服务质量qos参数定义)自动放置在消费者附近。 这种方法的优点在于为您完成了许多艰苦的工作(重新:可靠性,保证交付等)。

RabbitMQ的一个关键特性是,如果处理中出现错误,则将消息重新添加回队列(容错function)。

需要了解更多关于你的情况。

通常,如果您发布到我上面提到的列表中,您可以在RabbitMQ上找到工作人员。 他们非常有帮助。

希望有所帮助。 一开始你的头脑很多,但值得坚持下去。


Q&A

请参阅: http : //www.rabbitmq.com/faq.html

问:您可以使用新订阅(频道,queueName)订阅多个队列吗?

是。 您可以使用绑定密钥,例如abc。*。hij,或abc。#。hij,或者您附加多个绑定。 前者假设您已经围绕某种对您有意义的原则设计了路由密钥(请参阅常见问题解答中的路由密钥)。 对于后者,您需要绑定到多个队列。

手动实现n绑定。 请参阅: http : //hg.rabbitmq.com/rabbitmq-dotnet-client/file/default/projects/client/RabbitMQ.Client/src/client/messagepatterns/Subscription.cs

这种模式背后的代码不多,所以如果通配符不够,你可以推出自己的订阅模式。 你可以从这个类inheritance并添加另一个方法来进行额外的绑定……可能这会有效或者接近这个(未经测试)。

AQMP规范说可以进行多种手动绑定: http : //www.rabbitmq.com/amqp-0-9-1-reference.html#queue.bind

问:如果是这样,我如何通过所有订阅的队列并返回一条消息(没有消息时为null)?

对于订户,您会在有消息时收到通知。 否则,您所描述的是一个拉动界面,您可以根据请求提取消息。 如果没有可用的消息,您将根据需要获得null。 顺便说一句:Notify方法可能更方便。

问:哦,请注意,我用不同的方法完成了所有这些操作。 我将编辑我的post以反映代码

实时代码:

此版本必须使用通配符来订阅多个路由密钥

n使用订阅的手动路由键留给读者练习。 ;-)无论如何,我认为你倾向于拉接口。 btw:pull接口的效率低于通知接口。

  using (Subscription sub = new Subscription(ch, QueueNme)) { foreach (BasicDeliverEventArgs ev in sub) { Process(ev.Body); ... 

注意: foreach使用IEnumerable,IEnumerable通过“yield”语句包装新消息到达的事件。 实际上它是一个无限循环。

—更新

AMQP的设计理念是将TCP连接数保持在应用程序数量的最低位置,这意味着每个连接可以有多个通道。

这个问题中的代码(编辑3)尝试使用一个通道的两个订户,而它应该(我相信),每个通道每个通道一个订户,以避免锁定问题。 消化:使用路由密钥“通配符”。 可以使用java客户端订阅多个不同的队列名称,但据我所知,.net客户端在订阅者帮助程序类中实现了这一点。

如果您确实在同一个订阅线程上需要两个不同的队列名称,则建议为.net建议以下拉取序列:

  using (IModel ch = conn.CreateModel()) { // btw: no reason to close the channel afterwards IMO conn.AutoClose = true; // no reason to closs the connection either. Here for completeness. ch.QueueDeclare(queueName); BasicGetResult result = ch.BasicGet(queueName, false); if (result == null) { Console.WriteLine("No message available."); } else { ch.BasicAck(result.DeliveryTag, false); Console.WriteLine("Message:"); } return 0; } 

– 更新2:

来自RabbitMQ列表:

“假设element.Next()在其中一个订阅上阻塞。你可以从每个订阅中检索交付,并通过超时读取它。或者你可以设置一个队列来接收所有测量并从中检索消息。单一订阅。“ (埃米尔)

这意味着当第一个队列为空时,.Next()会阻塞等待下一条消息出现。 即订户内置了等待下一条消息。

– 更新3:

在.net下,使用QueueingBasicConsumer从多个队列中使用。

实际上,这是一个关于它的线索,以了解使用情况:

等待单个RabbitMQ消息超时

– 更新4:

关于.QueueingBasicConsumer的更多信息

这里有示例代码。

http://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.QueueingBasicConsumer.html

示例通过一些修改复制到答案中(参见// <-----)。

  IModel channel = ...; QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel); channel.BasicConsume(queueName, false, null, consumer); //<----- channel.BasicConsume(queueName2, false, null, consumer); //<----- // etc. channel.BasicConsume(queueNameN, false, null, consumer); //<----- // At this point, messages will be being asynchronously delivered, // and will be queueing up in consumer.Queue. while (true) { try { BasicDeliverEventArgs e = (BasicDeliverEventArgs) consumer.Queue.Dequeue(); // ... handle the delivery ... channel.BasicAck(e.DeliveryTag, false); } catch (EndOfStreamException ex) { // The consumer was cancelled, the model closed, or the // connection went away. break; } } 

- 更新5:一个简单的get将作用于任何队列(一个较慢但有时更方便的方法)。

  ch.QueueDeclare(queueName); BasicGetResult result = ch.BasicGet(queueName, false); if (result == null) { Console.WriteLine("No message available."); } else { ch.BasicAck(result.DeliveryTag, false); Console.WriteLine("Message:"); // deserialize body and display extra info here. } 

最简单的方法是使用EventingBasicConsumer。 我在我的网站上有一个如何使用它的例子。 RabbitMQ EventingBasicConsumer

此Consumer类公开您可以使用的已接收事件,因此不会阻止。 其余代码基本保持不变。