在Azure Service Bus中并行处理消息

问题:我发送了大量电子邮件,目前,在任何时间点,队列中平均有10封电子邮件。 我一次处理一个队列的代码; 也就是说,接收消息,处理它并最终发送电子邮件。 这会导致在用户注册服务时向用户发送电子邮件的延迟相当大。

我已经开始考虑修改代码来异步process the messages in parrallel 5。 我想要编写一个方法并使用CTP并行调用这个方法,比如5次。

我对如何实现这一点感到有点迷茫。 制造错误的成本非常高,因为如果出现问题,用户会感到失望。

Request:我需要帮助编写并行处理Azure服务总线中的消息的代码。 谢谢。

 My code in a nutshell. Public .. Run() { _myQueueClient.BeginReceive(ProcessUrgentEmails, _myQueueClient); } void ProcessUrgentEmails(IAsyncResult result) { //casted the `result` as a QueueClient //Used EndReceive on an object of BrokeredMessage //I processed the message, then called sendEmail.BeginComplete(ProcessEndComplete, sendEmail); } //This method is never called despite having it as callback function above. void ProcessEndComplete(IAsyncResult result) { Trace.WriteLine("ENTERED ProcessEndComplete method..."); var bm = result.AsyncState as BrokeredMessage; bm.EndComplete(result); } 

此页面提供了使用Windows Azure Service Bus时的性能提示。

关于并行处理,您可以拥有一个用于处理的线程池,每次收到消息时,您只需抓取其中一个池并为其分配消息。 您需要管理该池。

或者,您可以一次检索多个消息并使用TPL处理它们…例如, BeginReceiveBatch / EndReceiveBatch方法允许您从Queue(异步)中检索多个“项目”,然后使用“AsParallel”转换返回的IEnumerable以前的方法和处理多个线程中的消息。

非常简单和BARE BONES样品:

 var messages = await Task.Factory.FromAsync>(Client.BeginReceiveBatch(3, null, null), Client.EndReceiveBatch); messages.AsParallel().WithDegreeOfParallelism(3).ForAll(item => { ProcessMessage(item); }); 

该代码从“3个线程”中检索队列和进程中的3条消息(注意:不保证它将使用3个线程,.NET将分析系统资源,如果需要,它将使用最多3个线程)

您还可以删除“ WithDegreeOfParallelism ”部分,.NET将使用它需要的任何线程。

在一天结束时,有多种方法可以做到,你必须决定哪一个更适合你。

更新:不使用ASYNC / AWAIT的样本

这是使用常规Begin / End Async模式的基本(无错误检查)示例。

 using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Net; using System.Threading; using Microsoft.ServiceBus; using Microsoft.ServiceBus.Messaging; using Microsoft.WindowsAzure; using Microsoft.WindowsAzure.ServiceRuntime; namespace WorkerRoleWithSBQueue1 { public class WorkerRole : RoleEntryPoint { // The name of your queue const string QueueName = "QUEUE_NAME"; const int MaxThreads = 3; // QueueClient is thread-safe. Recommended that you cache // rather than recreating it on every request QueueClient Client; bool IsStopped; int dequeueRequests = 0; public override void Run() { while (!IsStopped) { // Increment Request Counter Interlocked.Increment(ref dequeueRequests); Trace.WriteLine(dequeueRequests + " request(s) in progress"); Client.BeginReceive(new TimeSpan(0, 0, 10), ProcessUrgentEmails, Client); // If we have made too many requests, wait for them to finish before requesting again. while (dequeueRequests >= MaxThreads && !IsStopped) { System.Diagnostics.Trace.WriteLine(dequeueRequests + " requests in progress, waiting before requesting more work"); Thread.Sleep(2000); } } } void ProcessUrgentEmails(IAsyncResult result) { var qc = result.AsyncState as QueueClient; var sendEmail = qc.EndReceive(result); // We have received a message or has timeout... either way we decrease our counter Interlocked.Decrement(ref dequeueRequests); // If we have a message, process it if (sendEmail != null) { var r = new Random(); // Process the message Trace.WriteLine("Processing message: " + sendEmail.MessageId); System.Threading.Thread.Sleep(r.Next(10000)); // Mark it as completed sendEmail.BeginComplete(ProcessEndComplete, sendEmail); } } void ProcessEndComplete(IAsyncResult result) { var bm = result.AsyncState as BrokeredMessage; bm.EndComplete(result); Trace.WriteLine("Completed message: " + bm.MessageId); } public override bool OnStart() { // Set the maximum number of concurrent connections ServicePointManager.DefaultConnectionLimit = 12; // Create the queue if it does not exist already string connectionString = CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString"); var namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString); if (!namespaceManager.QueueExists(QueueName)) { namespaceManager.CreateQueue(QueueName); } // Initialize the connection to Service Bus Queue Client = QueueClient.CreateFromConnectionString(connectionString, QueueName); IsStopped = false; return base.OnStart(); } public override void OnStop() { // Waiting for all requestes to finish (or timeout) before closing while (dequeueRequests > 0) { System.Diagnostics.Trace.WriteLine(dequeueRequests + " request(s), waiting before stopping"); Thread.Sleep(2000); } // Close the connection to Service Bus Queue IsStopped = true; Client.Close(); base.OnStop(); } } } 

希望能帮助到你。