Tag: message queue

使用MassTransit使用交换和路由密钥发布消息

我一直在看MassTransit几个星期了,我很好奇这些可能性。 但是,我似乎无法将这些概念完全正确。 预期的行为我想用路由密钥将消息发布到“直接”交换,路由密钥绑定到两个不同的队列以执行其他活动。 当我使用MassTransit尝试相同的逻辑以获得更好的可伸缩性时。 我发现MassTransit基于具有扇出类型的队列名称创建自己的交换。 通过交换和路由密钥发布消息的经典代码 using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange, “direct”); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange, routingKey, null, body); Console.WriteLine(” [x] Sent {0}”, message); } } 有没有办法在MassTransit中使用routingkey配置直接或主题交换?

MessageQueue和Async / Await

我只想以异步方式接收我的消息! 它冻结了我的UI public async void ProcessMessages() { MessageQueue MyMessageQueue = new MessageQueue(@”.\private$\MyTransactionalQueue”); MyMessageQueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(string) }); while (true) { MessageQueueTransaction MessageQueueTransaction = new MessageQueueTransaction(); MessageQueueTransaction.Begin(); ContainError = false; ProcessPanel.SetWaiting(); string Body = MyMessageQueue.Receive(MessageQueueTransaction).Body.ToString(); //Do some process with body string. MessageQueueTransaction.Commit(); } } 我只是像任何常规方法一样调用方法,并且它的工作正常! 当我使用BackgroundWorkers而不是async / await时,此代码曾经工作 想法?

如何并行读取队列中的消息?

情况 我们有一个消息队列。 我们希望并行处理消息并限制同时处理的消息的数量。 我们下面的试用代码会并行处理消息,但只有在前一个进程完成时才开始新的一批进程。 我们想在完成后重启任务。 换句话说:只要消息队列不为空,任务的最大数量应始终处于活动状态。 试用代码 static string queue = @”.\Private$\concurrenttest”; private static void Process(CancellationToken token) { Task.Factory.StartNew(async () => { while (true) { IEnumerable consumerTasks = ConsumerTasks(); await Task.WhenAll(consumerTasks); await PeekAsync(new MessageQueue(queue)); } }); } private static IEnumerable ConsumerTasks() { for (int i = 0; i { Console.WriteLine(“id: ” + message.id + “, […]

使用EasyNetQ for RabbitMQ时的自定义错误队列名称?

我没有将未处理的exception放入EasyNetQ_Default_Error_Queue,而是想知道是否有一种方法可以显式地说明应该用于给定应用程序的错误队列的名称,因此错误不会在这一个EasyNetQ_Default_Error_Queue中结束 ? 我可以看到如何指定常规消息队列名称,但没有找到任何有关错误队列名称的信息。

带有C#和Apache NMS的ActiveMQ – 计算队列中的消息

我正在使用ActiveMQ使用C#应用程序发送和接收消息。 但是我只是在计算队列中的消息时遇到了一些困难。这是我的代码: public int GetMessageCount() { int messageCount = 0; Uri connecturi = new Uri(this.ActiveMQUri); IConnectionFactory factory = new NMSConnectionFactory(connecturi); using (IConnection connection = factory.CreateConnection()) using (ISession session = connection.CreateSession()) { IDestination requestDestination = SessionUtil.GetDestination(session, this.QueueRequestUri); IQueueBrowser queueBrowser = session.CreateBrowser((IQueue)requestDestination); IEnumerator messages = queueBrowser.GetEnumerator(); while(messages.MoveNext()) { messageCount++; } connection.Close(); session.Close(); connection.Close(); } return messageCount; } […]

消息队列错误:找不到能够读取消息的格式化程序

我正在使用C#将消息写入消息队列,如下所示: queue.Send(new Message(“message”)); 我正在尝试阅读如下消息: Messages messages = queue.GetAllMessages(); foreach(Message m in messages) { String message = m.Body; //do something with string } 但是我收到一条错误消息,上面写着:“找不到能够读取此消息的格式化程序。” 我究竟做错了什么?

Rabbitmq Ack或Nack,将消息留在队列中

我一直在玩RabbitMq.net和消息致谢。 如果消费者能够处理消息,您可以以forms发送回ack channel.BasicAck(ea.DeliveryTag, false); 这将把它从队列中删除。 但是如果邮件无法处理呢? 也许是暂时中断,你不希望从队列中取出的消息只是放在后面并继续下一条消息? 我试过用 channel.BasicNack(ea.DeliveryTag, false, true); 但下一次它仍然得到相同的消息而不是移动到队列中的下一条消息 我的完整代码是 class Program { private static IModel channel; private static QueueingBasicConsumer consumer; private static IConnection Connection; static void Main(string[] args) { Connection = GetRabbitMqConnection(); channel = Connection.CreateModel(); channel.BasicQos(0, 1, false); consumer = new QueueingBasicConsumer(channel); channel.BasicConsume(“SMSQueue”, false, consumer); while (true) { if (!channel.IsOpen) { […]