Tag: 总线

调用OnMessage()后自动处理BrokeredMessage

我正在尝试从Azure Service Bus排队项目,以便我可以批量处理它们。 我知道Azure Service Bus有一个ReceiveBatch()但由于以下原因似乎有问题: 我一次最多只能获得256条消息,然后根据消息大小,这可能是随机的。 即使我查看有多少消息在等待,我也不知道有多少RequestBatch调用,因为我不知道每次调用会给我多少消息。 由于消息将继续存在,我不能继续发出请求,直到它为空,因为它永远不会是空的。 我决定只使用比浪费偷看更便宜的消息监听器,并且会给我更多的控制权。 基本上我试图让一定数量的消息建立,然后立即处理它们。 我使用计时器强制延迟,但我需要能够在他们进来时对我的物品进行排队。 基于我的计时器要求,似乎阻塞集合不是一个好选项,所以我试图使用ConcurrentBag。 var batchingQueue = new ConcurrentBag(); myQueueClient.OnMessage((m) => { Console.WriteLine(“Queueing message”); batchingQueue.Add(m); }); while (true) { var sw = WaitableStopwatch.StartNew(); BrokeredMessage msg; while (batchingQueue.TryTake(out msg)) // <== Object is already disposed { …do this until I have a thousand ready to be written […]

从Azure主题接收消息产生null,主题具有未读消息

昨天我能够重复阅读我自己创建的Azure Service Bus主题订阅中的消息。 然而,今天,虽然使用与昨天相同的代码,但是尽管订阅的消息计数远高于0,但所有消息都为空。让我详细说明: 按照MSDN网站上有关如何从订阅中读取消息的说明 ,我使用以下内容从订阅客户端读取消息: var tokenProvider = TokenProvider.CreateSharedSecretTokenProvider(“MySubscriber”,”MyKey”); var serviceUri = ServiceBusEnvironment.CreateServiceUri(“sb”, “MyNamespace”, string.Empty); var messagingFactory = MessagingFactory.Create(serviceUri, tokenProvider); var subscriptionClient = messagingFactory.CreateSubscriptionClient(“MyTopicName”, “MySubscriptionName”, ReceiveMode.PeekLock); var brokeredMessage = subscriptionClient.Receive(); 今天,brokeredMessage变量始终为null,但从下面的屏幕截图中可以看出,订阅中有几条消息。 那么为什么brokeredMessage == null? 已在管理门户中设置主题的设置,其中包括:默认消息生存时间= 1小时,重复检测历史记录= 10分钟,未发布发布前过滤消息,主题状态=启用,以及没有共享访问策略。 同样,对于订阅,设置为:默认消息生存时间= 10分钟,锁定持续时间= 5秒,最大传送计数= 10,主题订阅状态=启用,并且没有检查有关移动消息的复选框。 我会感激任何想法。

Azure ServiceBus和async – 是或不是?

我正在Azure上运行Service Bus, 每秒抽取大约10-100条消息 。 最近我转而使用.net 4.5并且所有兴奋重构的所有代码都在每行中至少有两次‘async’和’await ‘,以确保它’正确’完成’:) 现在我想知道它实际上是好还是坏 。 如果你能看一下代码片段,请告诉我你的想法。 我特别担心,如果线程上下文切换没有给我带来更多的悲伤而不是从所有异步中获益……(看看!dumpheap肯定是一个因素) 只是一点描述 – 我将发布2个方法 – 一个在ConcurrentQueue上执行while循环,等待新消息和另一个一次发送一条消息的方法。 我也正如Azure博士所规定的那样使用Transient Fault Handling块。 发送循环(从头开始,等待新消息): private async void SendingLoop() { try { await this.RecreateMessageFactory(); this.loopSemaphore.Reset(); Buffer message = null; while (true) { if (this.cancel.Token.IsCancellationRequested) { break; } this.semaphore.WaitOne(); if (this.cancel.Token.IsCancellationRequested) { break; } while (this.queue.TryDequeue(out message)) { try { using […]