使用ReceiveById可怕的MSMQ性能

每秒只有20条消息! 这就是我得到的! 这是从队列中查看50条消息并使用ReceiveById并行接收它们的代码。 队列中的消息总数是500.我也测试了其他数字。 但是上限是每秒20条消息! 我在某处完全不受影响吗?

编辑1:

1 – 我需要队列可以恢复。 但有趣的是,即使我将可恢复选项设置为false; 仍然是上限是20消息/秒。

2 – 我被迫在这里使用MSMQ,因为涉及一些遗留应用程序。 但是如果这个代码是正确的并且这个前20个限制确实存在,我可以说服该组切换。 因此,任何推荐(基于实际经验)替换MSMQ都是非常受欢迎的(请注意,如果出现任何类型的失败,我们需要保留我们的消息)。

3 – 我已经将ThreadPool中的线程数设置为高数,以防它有所帮助,但实际上在这段代码中它将导致创建100到200个线程。 我已经测试了从50到10000的不同数字,没有差异。

4 – 在每个任务中创建一个新的MessageQueue,因为ReceiveById不是线程安全的。

5 – 正如人们在代码中看到的那样,消息大小非常低; 它只是一个字符串加一个int。

编辑2:[ 非常奇怪的新结果 ]

我已经玩了这段代码的每一点,发现了这个:如果我注释掉行singleLocal.UseJournalQueue = false; 在我的任务中,我每秒最多可以读取1200条消息。 不令人印象深刻,但在我的情 奇怪的部分是UseJournalQueue的默认值为false; 为什么再次将其设置为false应该会在性能上产生这样的差异?

static partial class Program { static void Main(string[] args) { ThreadPool.SetMaxThreads(15000, 30000); ThreadPool.SetMinThreads(10000, 20000); var qName = @".\private$\deep_den"; if (!MessageQueue.Exists(qName)) { var q = MessageQueue.Create(qName); } var single = new MessageQueue(qName); single.UseJournalQueue = false; single.DefaultPropertiesToSend.AttachSenderId = false; single.DefaultPropertiesToSend.Recoverable = true; single.Formatter = new XmlMessageFormatter(new[] { typeof(Data) }); var count = 500; var watch = new Stopwatch(); watch.Start(); for (int i = 0; i < count; i++) { var data = new Data { Name = string.Format("name_{0}", i), Value = i }; single.Send(new Message(data)); } watch.Stop(); Console.WriteLine("sent {0} msec/message", watch.Elapsed.TotalMilliseconds / count); Console.WriteLine("sent {0} message/sec", count / watch.Elapsed.TotalSeconds); var enu = single.GetMessageEnumerator2(); watch.Reset(); watch.Start(); while (Interlocked.Read(ref __counter) < count) { var list = new List(); var peekCount = 50; while (peekCount > 0 && enu.MoveNext(TimeSpan.FromMilliseconds(10))) { try { list.Add(enu.Current); peekCount--; } catch (Exception ex2) { Trace.WriteLine(ex2.ToString()); break; } } var tlist = new List(); foreach (var message in list) { var stupid_closure = message; var t = new Task(() => { using (var singleLocal = new MessageQueue(qName)) { singleLocal.UseJournalQueue = false; singleLocal.DefaultPropertiesToSend.AttachSenderId = false; singleLocal.DefaultPropertiesToSend.Recoverable = true; singleLocal.Formatter = new XmlMessageFormatter(new[] { typeof(Data) }); try { // processing the message and insert it into database // workflow completed here, so we can safely remove the message from queue var localM = singleLocal.ReceiveById(stupid_closure.Id); var localSample = (Data)localM.Body; Interlocked.Increment(ref __counter); Console.WriteLine(Interlocked.Read(ref __counter)); } catch (MessageQueueException ex) { if (ex.MessageQueueErrorCode != MessageQueueErrorCode.IOTimeout) Trace.WriteLine(ex.ToString()); } catch (Exception ex2) { Trace.WriteLine(ex2.ToString()); } } }, TaskCreationOptions.PreferFairness); tlist.Add(t); } foreach (var t in tlist) t.Start(); Task.WaitAll(tlist.ToArray()); list.Clear(); } watch.Stop(); Console.WriteLine("rcvd {0} msec/message", watch.Elapsed.TotalMilliseconds / count); Console.WriteLine("rcvd {0} message/sec", count / watch.Elapsed.TotalSeconds); Console.WriteLine("press any key to continue ..."); Console.ReadKey(); } static long __counter = 0; } 

Kaveh,您正在使用的MessageQueue对象的构造函数将UseJournalQueue属性设置为true,以防启用消息队列对象的日记设置。 不知怎的,它正在考虑。\ private $ \ deep_den的日志设置已启用。 编辑 – 您使用的是预先创建的队列吗?

在对其重要性进行基准测试时,要将代码保持在最低限度,以避免背景噪声干扰测试。

不幸的是,你的测试是如此嘈杂,以至于很难找到导致延迟的确切原因

  • 不要使用线程。 multithreading很少有用,通常会带来更多弊大于利。
  • 只测试一件事。 当测试ReceiveById不使用GetMessageEnumerator2时,它的成本很高,您需要在结尾处将其从结果中删除。
  • 仅创建一次MessageQueue并重用它。 我们只测试ReceiveById而不是创建新的MessageQueue类。

我已经重写了测试并获得了更好的结果MSMQ不是块上最快的队列,但它并不慢。

  var qName = @".\private$\deep_den"; if (!MessageQueue.Exists(qName)) { var q = MessageQueue.Create(qName); } var single = new MessageQueue(qName); single.UseJournalQueue = true; single.DefaultPropertiesToSend.AttachSenderId = false; single.DefaultPropertiesToSend.Recoverable = true; single.Formatter = new XmlMessageFormatter(new[] { typeof(Data) }); var count = 500; var watch = new Stopwatch(); watch.Start(); for (int i = 0; i < count; i++) { var data = new Data { Name = string.Format("name_{0}", i), Value = i }; single.Send(new Message(data)); } watch.Stop(); Console.WriteLine("sent {0} msec/message", watch.Elapsed.TotalMilliseconds / count); Console.WriteLine("sent {0} message/sec", count / watch.Elapsed.TotalSeconds); var enu = single.GetMessageEnumerator2(); watch.Reset(); watch.Start(); var queue = new MessageQueue(qName); queue.UseJournalQueue = true; queue.DefaultPropertiesToSend.AttachSenderId = false; queue.DefaultPropertiesToSend.Recoverable = true; queue.Formatter = new XmlMessageFormatter(new[] { typeof(Data) }); List lst = new List(); while (lst.Count != count && enu.MoveNext(TimeSpan.FromDays(1))) { var message = queue.ReceiveById(enu.Current.Id); lst.Add((Data)message.Body); } watch.Stop(); Console.WriteLine("rcvd {0} msec/message", watch.Elapsed.TotalMilliseconds / count); Console.WriteLine("rcvd {0} message/sec", count / watch.Elapsed.TotalSeconds); Console.WriteLine("press any key to continue ..."); Console.ReadKey(); 

Kaveh,我在这里可能完全错了,但我认为你的问题是XML序列化。 一旦创建了XmlSerializer,它仍然可能很慢,但构造函数确实需要时间。

我建议要么完全删除序列化并将数据作为字符串读取,要么先建立一个XmlSerializer或XmlMessageFormatter并将其传递给线程。 我会说要小心线程问题,但看起来你对此有很好的把握。