Azure ServiceBus和async – 是或不是?

我正在Azure上运行Service Bus, 每秒抽取大约10-100条消息

最近我转而使用.net 4.5并且所有兴奋重构的所有代码都在每行中至少有两次‘async’和’await ‘,以确保它’正确’完成’:)

现在我想知道它实际上是好还是坏 。 如果你能看一下代码片段,请告诉我你的想法。 我特别担心,如果线程上下文切换没有给我带来更多的悲伤而不是从所有异步中获益……(看看!dumpheap肯定是一个因素)

只是一点描述 – 我将发布2个方法 – 一个在ConcurrentQueue上执行while循环,等待新消息和另一个一次发送一条消息的方法。 我也正如Azure博士所规定的那样使用Transient Fault Handling块。

发送循环(从头开始,等待新消息):

private async void SendingLoop() { try { await this.RecreateMessageFactory(); this.loopSemaphore.Reset(); Buffer message = null; while (true) { if (this.cancel.Token.IsCancellationRequested) { break; } this.semaphore.WaitOne(); if (this.cancel.Token.IsCancellationRequested) { break; } while (this.queue.TryDequeue(out message)) { try { using (message) { //only take send the latest message if (!this.queue.IsEmpty) { this.Log.Debug("Skipping qeued message, Topic: " + message.Value.Topic); continue; } else { if (this.Topic == null || this.Topic.Path != message.Value.Topic) await this.EnsureTopicExists(message.Value.Topic, this.cancel.Token); if (this.cancel.Token.IsCancellationRequested) break; await this.SendMessage(message, this.cancel.Token); } } } catch (OperationCanceledException) { break; } catch (Exception ex) { ex.LogError(); } } } } catch (OperationCanceledException) { } catch (Exception ex) { ex.LogError(); } finally { if (this.loopSemaphore != null) this.loopSemaphore.Set(); } } 

发送消息:

 private async Task SendMessage(Buffer message, CancellationToken cancellationToken) { //this.Log.Debug("MessageBroadcaster.SendMessage to " + this.GetTopic()); bool entityNotFound = false; if (this.MessageSender.IsClosed) { //this.Log.Debug("MessageBroadcaster.SendMessage MessageSender closed, recreating " + this.GetTopic()); await this.EnsureMessageSender(cancellationToken); } try { await this.sendMessageRetryPolicy.ExecuteAsync(async () => { message.Value.Body.Seek(0, SeekOrigin.Begin); using (var msg = new BrokeredMessage(message.Value.Body, false)) { await Task.Factory.FromAsync(this.MessageSender.BeginSend, this.MessageSender.EndSend, msg, null); } }, cancellationToken); } catch (MessagingEntityNotFoundException) { entityNotFound = true; } catch (OperationCanceledException) { } catch (ObjectDisposedException) { } catch (Exception ex) { ex.LogError(); } if (entityNotFound) { if (!cancellationToken.IsCancellationRequested) { await this.EnsureTopicExists(message.Value.Topic, cancellationToken); } } } 

上面的代码来自发送1个消息/秒的’Sender’类。 我在任何给定的时间都有大约50-100个实例运行,所以它可能是相当多的线程。

顺便说一句,不要担心EnsureMessageSender,RecreateMessageFactory,EnsureTopicExists太多,它们不经常被调用。

如果只需要一个后台线程在消息队列中工作并同步发送消息,我是不是更好,只要我需要的是一次发送一条消息,不用担心异步的东西并避免它带来的开销。

请注意,通常将一条消息发送到Azure Service Bus只需几毫秒,但这并不贵。 (除非它很慢,超时或Service Bus后端出现问题,它可能会暂停一段时间尝试发送内容)。

谢谢和抱歉长篇文章,

斯特沃

提出的解决方案

这个例子可以解决我的情况吗?

 static void Main(string[] args) { var broadcaster = new BufferBlock(); //queue var cancel = new CancellationTokenSource(); var run = Task.Run(async () => { try { while (true) { //check if we are not finished if (cancel.IsCancellationRequested) break; //async wait until a value is available var val = await broadcaster.ReceiveAsync(cancel.Token).ConfigureAwait(false); int next = 0; //greedy - eat up and ignore all the values but last while (broadcaster.TryReceive(out next)) { Console.WriteLine("Skipping " + val); val = next; } //check if we are not finished if (cancel.IsCancellationRequested) break; Console.WriteLine("Sending " + val); //simulate sending delay await Task.Delay(1000).ConfigureAwait(false); Console.WriteLine("Value sent " + val); } } catch (Exception ex) { Console.WriteLine(ex); } }, cancel.Token); //simulate sending messages. One every 200mls for (int i = 0; i < 20; i++) { Console.WriteLine("Broadcasting " + i); broadcaster.Post(i); Thread.Sleep(200); } cancel.Cancel(); run.Wait(); } 

你说:

上面的代码来自发送1个消息/秒的’Sender’类。 我在任何给定的时间都有大约50-100个实例运行,所以它可能是相当多的线程。

这是异步的一个很好的例子。 你在这里节省了很multithreading。 Async 减少了上下文切换,因为它不是基于线程的。 在需要等待的情况下,它不会进行上下文切换。 而是在同一个线程上处理下一个工作项(如果有的话)。

因此,异步解决方案肯定比同步解决方案更好。 是否需要测量在工作流程的50-100个实例中实际使用较少的CPU。 实例越多,异步更快的概率就越高。

现在,实现存在一个问题:您正在使用不是异步准备的ConcurrentQueue 。 因此,即使在异步版本中,您实际上也使用了50-100个线程。 它们会阻塞(你想要避免)或忙 – 等待烧掉100%的CPU(在你的实现中似乎就是这种情况!)。 您需要摆脱这个问题并使排队异步。 也许SemaphoreSlim在这里有所帮助,因为它可以异步等待。

首先,请记住Task != Thread 。 任务(和async方法延续)被安排到线程池,只要你的任务很短,微软已经进行了大量的优化工作。

查看代码,一行引发一个标志: semaphore.WaitOne 。 我假设你使用它作为一种信号,队列中有数据可用。 这很糟糕,因为它是async方法中的阻塞等待。 通过使用阻塞等待,代码从轻量级延续变为更重的线程池线程。

所以,我会遵循@ usr的建议,用async准备队列替换队列(和信号量)。 TPL Dataflow的BufferBlock是一个可通过NuGet获得的async生产者/消费者队列。 我首先推荐这个,因为听起来你的项目可以比使用数据流更广泛地受益于队列(但队列是一个很好的起点)。

存在其他async数据结构; 我的AsyncEx库有几个。 自己建造一个简单的也不难; 我有关于这个主题的博客文章 。 但我建议您使用TPL Dataflow。