调用OnMessage()后自动处理BrokeredMessage

我正在尝试从Azure Service Bus排队项目,以便我可以批量处理它们。 我知道Azure Service Bus有一个ReceiveBatch()但由于以下原因似乎有问题:

  • 我一次最多只能获得256条消息,然后根据消息大小,这可能是随机的。
  • 即使我查看有多少消息在等待,我也不知道有多少RequestBatch调用,因为我不知道每次调用会给我多少消息。 由于消息将继续存在,我不能继续发出请求,直到它为空,因为它永远不会是空的。

我决定只使用比浪费偷看更便宜的消息监听器,并且会给我更多的控制权。

基本上我试图让一定数量的消息建立,然后立即处理它们。 我使用计时器强制延迟,但我需要能够在他们进来时对我的物品进行排队。

基于我的计时器要求,似乎阻塞集合不是一个好选项,所以我试图使用ConcurrentBag。

var batchingQueue = new ConcurrentBag(); myQueueClient.OnMessage((m) => { Console.WriteLine("Queueing message"); batchingQueue.Add(m); }); while (true) { var sw = WaitableStopwatch.StartNew(); BrokeredMessage msg; while (batchingQueue.TryTake(out msg)) // <== Object is already disposed { ...do this until I have a thousand ready to be written to DB in batch Console.WriteLine("Completing message"); msg.Complete(); // <== ERRORS HERE } sw.Wait(MINIMUM_DELAY); } 

但是,只要我在OnMessage管道之外访问消息,它就会显示BrokeredMessage已经被处理掉了。

我认为这必须是OnMessage的一些自动行为,我没有看到任何方式对消息做任何事情,除了立即处理它我不想做。

使用BlockingCollection非常容易。

 var batchingQueue = new BlockingCollection(); myQueueClient.OnMessage((m) => { Console.WriteLine("Queueing message"); batchingQueue.Add(m); }); 

和你的消费者线程:

 foreach (var msg in batchingQueue.GetConsumingEnumerable()) { Console.WriteLine("Completing message"); msg.Complete(); } 

GetConsumingEnumerable返回一个迭代器,该迭代器使用队列中的项目,直到设置了IsCompleted属性并且队列为空。 如果队列为空但IsCompletedFalse ,则会执行非忙等待下一个项目。

要取消使用者线程(即关闭程序),您将停止向队列添加内容并让主线程调用batchingQueue.CompleteAdding 。 使用者将清空队列,看到IsCompleted属性为True ,然后退出。

在这里使用BlockingCollectionConcurrentBagConcurrentQueue更好,因为BlockingCollection接口更容易使用。 特别是,使用GetConsumingEnumerable可以使您不必担心检查计数或进行忙等待(轮询循环)。 它只是有效。

另请注意, ConcurrentBag有一些相当奇怪的删除行为。 特别是,删除项目的顺序根据哪个线程删除项目而不同。 创建包的线程以与其他线程不同的顺序删除项目。 有关详细信息,请参阅使用ConcurrentBag Collection 。

您还没有说明为什么要在输入中批量处理项目。 除非有一个压倒一切的性能原因,否则使用该批处理逻辑复杂化代码似乎并不是一个特别好的主意。


如果要对数据库进行批量写入,那么我建议使用简单的List来缓冲项目。 如果必须在将项目写入数据库之前对其进行处理,请使用上面显示的技术来处理它们。 然后,而是直接写入数据库,将项目添加到列表中。 当列表获得1,000个项目或经过一定时间后,分配新列表并启动任务以将旧列表写入数据库。 像这样:

 // at class scope // Flush every 5 minutes. private readonly TimeSpan FlushDelay = TimeSpan.FromMinutes(5); private const int MaxBufferItems = 1000; // Create a timer for the buffer flush. System.Threading.Timer _flushTimer = new System.Threading.Timer(TimedFlush, FlushDelay.TotalMilliseconds, Timeout.Infinite); // A lock for the list. Unless you're getting hundreds of thousands // of items per second, this will not be a performance problem. object _listLock = new Object(); List _recordBuffer = new List(); 

然后,在您的消费者中:

 foreach (var msg in batchingQueue.GetConsumingEnumerable()) { // process the message Console.WriteLine("Completing message"); msg.Complete(); lock (_listLock) { _recordBuffer.Add(msg); if (_recordBuffer.Count >= MaxBufferItems) { // Stop the timer _flushTimer.Change(Timeout.Infinite, Timeout.Infinite); // Save the old list and allocate a new one var myList = _recordBuffer; _recordBuffer = new List(); // Start a task to write to the database Task.Factory.StartNew(() => FlushBuffer(myList)); // Restart the timer _flushTimer.Change(FlushDelay.TotalMilliseconds, Timeout.Infinite); } } } private void TimedFlush() { bool lockTaken = false; List myList = null; try { if (Monitor.TryEnter(_listLock, 0, out lockTaken)) { // Save the old list and allocate a new one myList = _recordBuffer; _recordBuffer = new List(); } } finally { if (lockTaken) { Monitor.Exit(_listLock); } } if (myList != null) { FlushBuffer(myList); } // Restart the timer _flushTimer.Change(FlushDelay.TotalMilliseconds, Timeout.Infinite); } 

这里的想法是,您将旧列表排除在外,分配新列表以便继续处理,然后将旧列表的项目写入数据库。 锁是为了防止计时器和记录计数器相互踩踏。 如果没有锁定,事情可能会在一段时间内正常运行,然后在不可预测的时间内发生奇怪的崩溃。

我喜欢这种设计,因为它消除了消费者的轮询。 我唯一不喜欢的是消费者必须知道计时器(即它必须停止然后重新启动计时器)。 稍微考虑一下,我可以消除这个要求。 但它的编写方式很好。

切换到OnMessageAsync解决了我的问题

 _queueClient.OnMessageAsync(async receivedMessage => 

我联系了Microsoft关于在MSDN上处理问题的BrokeredMessage,这是响应:

非常基本的规则,我不确定这是否记录在案。 收到的消息需要在回调函数的生命周期中处理。 在您的情况下,将在异步回调完成时处理消息,这就是您在另一个线程中使用ObjectDisposedException失败的完整尝试的原因。

我真的没有看到进一步处理的排队消息如何有助于吞吐量。 这肯定会给客户增加更多负担。 尝试在异步回调中处理消息,该消息应该足够高效。

在我的情况下,这意味着我不能以我想要的方式使用ServiceBus,我必须重新思考我希望如何工作。 开溜。

我开始使用Azure Service Bus服务时遇到了同样的问题。

我发现方法OnMessage总是处理BrokedMessage对象。 Jim Mischel提出的方法对我没有帮助(但阅读非常有趣 – 谢谢!)。

经过一番调查后,我发现整个方法都是错误的。 让我解释一下你想要的正确方法。

  1. 仅在OnMessage方法处理程序中使用BrokedMessage.Complete()方法。
  2. 如果需要在此方法之外处理消息,则应使用方法QueueClient.Complete(Guid lockToken)。 “LockToken”是BrokeredMessage对象的属性。

例:

  var messageOptions = new OnMessageOptions { AutoComplete = false, AutoRenewTimeout = TimeSpan.FromMinutes( 5 ), MaxConcurrentCalls = 1 }; var buffer = new Dictionary(); // get message from queue myQueueClient.OnMessage( m => buffer.Add(key: m.GetBody(), value: m.LockToken), messageOptions // this option says to ServiceBus to "froze" message in he queue until we process it ); foreach(var item in buffer){ try { Console.WriteLine($"Process item: {item.Key}"); myQueueClient.Complete(item.Value);// you can also use method CompleteBatch(...) to improve performance } catch{ // "unfroze" message in ServiceBus. Message would be delivered to other listener myQueueClient.Defer(item.Value); } }