RabbitMQ 3.5和消息优先级

RabbitMQ 3.5现在支持消息优先级 ; 但是,我无法建立一个有效的例子。 我把我的代码放在下面。 它包括我期望的输出和实际的输出。 我会对更多文档和/或工作示例感兴趣。

所以我的问题很简单:如何在Rabbit 3.5.0.0中使用消息优先级?

出版商:

using System; using RabbitMQ.Client; using System.Text; using System.Collections.Generic; class Publisher { public static void Main() { var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { IDictionary  args = new Dictionary() ; args.Add(" x-max-priority ", 10); channel.QueueDeclare("task_queue1", true, false, true, args); for (int i = 1 ; i<=10; i++ ) { var message = "Message"; var body = Encoding.UTF8.GetBytes(message + " " + i); var properties = channel.CreateBasicProperties(); properties.SetPersistent(true); properties.Priority = Convert.ToByte(i); channel.BasicPublish("", "task_queue1", properties, body); } } } } } 

消费者:

 using System; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.Text; using System.Threading; using System.Collections.Generic; namespace Consumer { class Worker { public static void Main() { var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { IDictionary args = new Dictionary(); channel.BasicQos(0, 1, false); var consumer = new QueueingBasicConsumer(channel); IDictionary consumerArgs = new Dictionary(); channel.BasicConsume( "task_queue1", false, "", args, consumer); Console.WriteLine(" [*] Waiting for messages. " + "To exit press CTRL+C"); while (true) { var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); channel.BasicAck(ea.DeliveryTag, false); } } } } } } 

实际产量:

 [*] Waiting for messages. To exit press CTRL+C [x] Received Message 1 [x] Received Message 2 [x] Received Message 3 [x] Received Message 4 [x] Received Message 5 [x] Received Message 6 [x] Received Message 7 [x] Received Message 8 [x] Received Message 9 [x] Received Message 10 

预期产量:

 [*] Waiting for messages. To exit press CTRL+C [x] Received Message 10 [x] Received Message 9 [x] Received Message 8 [x] Received Message 7 [x] Received Message 6 [x] Received Message 5 [x] Received Message 4 [x] Received Message 3 [x] Received Message 2 [x] Received Message 1 

更新#1。 我在Java中找到了一个例子。 然而,它是Rabbit 3.4.xx插件,并入3.5。 我能看到的唯一区别是它们将优先级表示为int,而mine是一个字节。 但我觉得这是一个红鲱鱼。 我在这里有点不知所措。

好吧,我解决了。 这是一个愚蠢的错误。 我写:

 args.Add(" x-max-priority ", 10); 

应该是的

 args.Add("x-max-priority", 10); 

我会把它留下来,以便其他人可以在C#中有一个Rabbitmq 3.5的优先级队列的工作示例。

Node JS中类似的RabbitMq优先级队列实现

安装amqplib

为了测试,我们需要安装amqplib

 npm install amqplib 

发布者(send.js)

 #!/usr/bin/env node var amqp = require('amqplib/callback_api'); function bail(err, conn) { console.error(err); if (conn) conn.close(function() { process.exit(1); }); } function on_connect(err, conn) { if (err !== null) return bail(err); // name of queue var q = 'hello'; var msg = 'Hello World!'; var priorityValue = 0; function on_channel_open(err, ch) { if (err !== null) return bail(err, conn); // maxPriority : max priority value supported by queue ch.assertQueue(q, {durable: false, maxPriority: 10}, function(err, ok) { if (err !== null) return bail(err, conn); for(var index=1; index<=100; index++) { priorityValue = Math.floor((Math.random() * 10)); msg = 'Hello World!' + ' ' + index + ' ' + priorityValue; ch.publish('', q, new Buffer(msg), {priority: priorityValue}); console.log(" [x] Sent '%s'", msg); } ch.close(function() { conn.close(); }); }); } conn.createChannel(on_channel_open); } amqp.connect(on_connect); 

订阅者(receive.js)

 #!/usr/bin/env node var amqp = require('amqplib/callback_api'); function bail(err, conn) { console.error(err); if (conn) conn.close(function() { process.exit(1); }); } function on_connect(err, conn) { if (err !== null) return bail(err); process.once('SIGINT', function() { conn.close(); }); var q = 'hello'; function on_channel_open(err, ch) { ch.assertQueue(q, {durable: false, maxPriority: 10}, function(err, ok) { if (err !== null) return bail(err, conn); ch.consume(q, function(msg) { // message callback console.log(" [x] Received '%s'", msg.content.toString()); }, {noAck: true}, function(_consumeOk) { // consume callback console.log(' [*] Waiting for messages. To exit press CTRL+C'); }); }); } conn.createChannel(on_channel_open); } amqp.connect(on_connect); 

跑:

 node send.js 

它将创建一个名为“hello”的队列,并使用默认的AMQP交换将其填充为“1000”样本消息。

 node receive.js 

它将作为消费者来订阅队列中等待的消息。

另一种可能性(对于未来的搜索者)

消息传递的“推送”方法似乎不尊重优先级。

http://rabbitmq.docs.pivotal.io/35/rabbit-web-docs/dotnet-api-guide.html.html

以下是上述URL的引用。 我加粗了重要的一部分。

通过订阅检索邮件(“推送API”)

接收消息的另一种方法是使用IBasicConsumer接口设置订阅。 然后,消息将在到达时自动传递,而不是必须主动请求。 实现消费者的一种方法是使用便利类EventingBasicConsumer,它将交付和其他消费者生命周期事件作为C#事件发送:

 var consumer = new EventingBasicConsumer(channel); consumer.Received += (ch, ea) => { var body = ea.Body; // ... process the message ch.BasicAck(ea.DeliveryTag, false); }; String consumerTag = channel.BasicConsume(queueName, false, consumer); 

通过更改为“拉”方法,优先级似乎得到尊重。 但是,在下面引文中 (来自上面的相同url),看起来有一个权衡(我加粗了

获取单个消息(“pull API”)要检索单个消息,请使用IModel.BasicGet。 返回的值是BasicGetResult的一个实例,可以从中提取头信息(属性)和消息体:

 bool noAck = false; BasicGetResult result = channel.BasicGet(queueName, noAck); if (result == null) { // No message available at this time. } else { IBasicProperties props = result.BasicProperties; byte[] body = result.Body; ... 

由于上面的noAck = false,您还必须调用IModel.BasicAck以确认您已成功接收并处理了该消息:

  ... // acknowledge receipt of the message channel.BasicAck(result.DeliveryTag, false); } 

请注意,使用此API获取消息的效率相对较低。 如果您更喜欢RabbitMQ将消息推送到客户端,请参阅下一节。

(在这种情况下,“下一个”部分将带您进入本文顶部的“推送”方法)