Azure Service Bus – 使用OnMessage()方法接收消息

在MS文档之后,从订阅接收消息并不困难。 但是,如果我希望我的应用程序每次发布新消息时都会收到消息 – 持续轮询。 因此, SubscriptionClient类的OnMessage()方法。

MS文档说:……当调用OnMessage时,客户端启动一个内部消息泵,不断轮询队列或订阅。这个消息泵包含一个发出Receive()调用的无限循环。如果调用超时,它发出下一个Receive()调用….

但是当应用程序运行时,调用OnMessage()方法时,只接收最新消息。 发布新消息时,常量轮询似乎不起作用。 在尝试了许多不同的方法之后,我可以使这项工作成为唯一的方法并让应用程序在收到新消息时做出反应,即将代码放入具有无限循环的单独任务中。 在如此多的层面上,这似乎完全错了! (见下面的代码)。

任何人都可以帮我修正我的代码或发布一个工作样本来完成相同的function而不需要循环吗? 谢谢!

public void ReceiveMessageFromSubscription(string topicName, string subscriptionFilter) { var newMessage = new MessageQueue(); int i = 0; Task listener = Task.Factory.StartNew(() => { while (true) { SubscriptionClient Client = SubscriptionClient.CreateFromConnectionString(connectionString, topicName, subscriptionFilter); Dictionary retrievedMessage = new Dictionary(); OnMessageOptions options = new OnMessageOptions(); options.AutoComplete = false; options.AutoRenewTimeout = TimeSpan.FromMinutes(1); Client.OnMessage((message) => { try { retrievedMessage.Add("messageGuid", message.Properties["MessageGuid"].ToString()); retrievedMessage.Add("instanceId", message.Properties["InstanceId"].ToString()); retrievedMessage.Add("pId", message.Properties["ProcessId"].ToString()); retrievedMessage.Add("processKey", message.Properties["ProcessKey"].ToString()); retrievedMessage.Add("message", message.Properties["Message"].ToString()); newMessage.AnnounceNewMessage(retrievedMessage); // event -> message.Complete(); // Remove message from subscription. } catch (Exception ex) { string exmes = ex.Message; message.Abandon(); } }, options); retrievedMessage.Clear(); i++; Thread.Sleep(3000); } }); } 

您的代码有一些问题要解决 –

  • 它失败了,我假设您的应用程序然后退出 – 或者至少正在侦听消息的线程终止。
  • 你的while循环不断重复代码来连接消息处理程序,你只需要执行一次。
  • 您需要一种方法来保持调用堆栈的活动,并防止您的应用程序垃圾收集您的对象。

下面应该会看到你走向成功的道路。 祝好运。

  ManualResetEvent CompletedResetEvent = new ManualResetEvent(false); SubscriptionClient Client; public void ReceiveMessagesFromSubscription(string topicName, string subscriptionFilter, string connectionString) { Task listener = Task.Factory.StartNew(() => { // You only need to set up the below once. Client = SubscriptionClient.CreateFromConnectionString(connectionString, topicName, subscriptionFilter); OnMessageOptions options = new OnMessageOptions(); options.AutoComplete = false; options.AutoRenewTimeout = TimeSpan.FromMinutes(1); options.ExceptionReceived += LogErrors; Client.OnMessage((message) => { try { Trace.WriteLine("Got the message with ID {0}", message.MessageId); message.Complete(); // Remove message from subscription. } catch (Exception ex) { Trace.WriteLine("Exception occurred receiving a message: {0}" + ex.ToString()); message.Abandon(); // Failed. Leave the message for retry or max deliveries is exceeded and it dead letters. } }, options); CompletedResetEvent.WaitOne(); }); } ///  /// Added in rudimentary exception handling . ///  /// The sender. /// The  instance containing the event data. private void LogErrors(object sender, ExceptionReceivedEventArgs ex) { Trace.WriteLine("Exception occurred in OnMessage: {0}" + ex.ToString()); } ///  /// Call this to stop the messages arriving from subscription. ///  public void StopMessagesFromSubscription() { Client.Close(); // Close the message pump down gracefully CompletedResetEvent.Set(); // Let the execution of the listener complete and terminate gracefully } 

或者,您可以使用ReceiveBatch以更传统的方式将消息分块:

 var messages = await queueClient.ReceiveBatchAsync(10, TimeSpan.FromSeconds(30), cancellationToken);