重新启动消息队列服务后,服务不接收消息

我们有一个服务从n个消息队列接收消息。 但是,如果重新启动消息队列服务,即使消息队列服务已成功重新启动,消息检索服务也会停止接收消息。

我试图专门捕获消息检索服务中抛出的MessageQueueException并再次调用队列的BeginReceive方法。 但是,在消息队列服务重新启动的2秒钟内,我得到大约1875个exception实例,然后当我们的StartListening方法中抛出另一个MessageQueueException时,服务停止运行。

有没有一种优雅的方法从消息队列服务重新启动恢复?

private void OnReceiveCompleted(object sender, ReceiveCompletedEventArgs e) { MessageQueue queue = (MessageQueue)sender; try { Message message = queue.EndReceive(e.AsyncResult); this.StartListening(queue); if (this.MessageReceived != null) this.MessageReceived(this, new MessageReceivedEventArgs(message)); } catch (MessageQueueException) { LogUtility.LogError(String.Format(CultureInfo.InvariantCulture, StringResource.LogMessage_QueueManager_MessageQueueException, queue.MachineName, queue.QueueName, queue.Path)); this.StartListening(queue); } } public void StartListening(MessageQueue queue) { queue.BeginReceive(); } 

我需要处理这个导致的无限循环问题并清理一下但是你明白了。

发生MessageQueueException时,调用RecoverQueue方法。

  private void RecoverQueue(MessageQueue queue) { string queuePath = queue.Path; bool queueRecovered = false; while (!queueRecovered) { try { this.StopListening(queue); queue.Close(); queue.Dispose(); Thread.Sleep(2000); MessageQueue newQueue = this.CreateQueue(queuePath); newQueue.ReceiveCompleted += new ReceiveCompletedEventHandler(this.OnReceiveCompleted); this.StartListening(newQueue); LogUtility.LogInformation(String.Format(CultureInfo.InvariantCulture, "Message queue {0} recovered successfully.", newQueue.QueueName)); queueRecovered = true; } catch (Exception ex) { LogUtility.LogError(String.Format(CultureInfo.InvariantCulture, "The following error occurred while trying to recover queue: {0} error: {1}", queue.QueueName, ex.Message)); } } } public void StopListening(MessageQueue queue) { queue.ReceiveCompleted -= new ReceiveCompletedEventHandler(this.OnReceiveCompleted); } 

收到服务重新启动后的exception时,您必须释放旧的MessageQueue ,即取消接收ReceiveCompleted事件,处理MessageQueue等。然后创建MessageQueue的新实例并再次连接到ReceiveCompleted事件。新的MessageQueue实例。

或者,您可以使用在特定时间间隔内创建新实例的轮询方法,调用MessageQueue.Receive(TimeSpan) ,等待传入消息或直到超时发生。 在这种情况下,您处理消息并销毁MessageQueue实例并再次启动迭代。

通过每次重新创建MessageQueue ,您可以确保内置恢复。 此外,由于底层队列的内部缓存,创建MessageQueue的开销很小。

伪代码…

 while (!notDone)// or use a timer or periodic task of some sort... { try { using (MessageQueue queue = new MessageQueue(queuePath)) { Message message = queue.Receive(TimeSpan.FromMilliseconds(500)); // process message } } catch (MessageQueueException ex) { // handle exceptions } }