为什么我的破坏者的例子如此之慢?

我从Stack Overflow问题Disruptor.NET示例中获取了代码示例,并将其修改为“测量”时间。 完整列表如下:

using System; using System.Diagnostics; using System.Linq; using System.Threading; using System.Threading.Tasks; using Disruptor; using Disruptor.Dsl; namespace DisruptorTest { public sealed class ValueEntry { public long Value { get; set; } public ValueEntry() { Console.WriteLine("New ValueEntry created"); } } public class ValueAdditionHandler : IEventHandler { public void OnNext(ValueEntry data, long sequence, bool endOfBatch) { Program.sw.Stop(); long microseconds = Program.sw.ElapsedTicks / (Stopwatch.Frequency / (1000L * 1000L)); Console.WriteLine("elapsed microseconds = " + microseconds); Console.WriteLine("Event handled: Value = {0} (processed event {1}", data.Value, sequence); } } class Program { public static Stopwatch sw = Stopwatch.StartNew(); private static readonly Random _random = new Random(); private static readonly int _ringSize = 16; // Must be multiple of 2 static void Main(string[] args) { var disruptor = new Disruptor.Dsl.Disruptor(() => new ValueEntry(), _ringSize, TaskScheduler.Default); disruptor.HandleEventsWith(new ValueAdditionHandler()); var ringBuffer = disruptor.Start(); while (true) { var valueToSet = _random.Next(); long sequenceNo = ringBuffer.Next(); ValueEntry entry = ringBuffer[sequenceNo]; entry.Value = valueToSet; sw.Restart(); ringBuffer.Publish(sequenceNo); Console.WriteLine("Published entry {0}, value {1}", sequenceNo, entry.Value); Thread.Sleep(1000); } } } } 

输出是:

 New ValueEntry created New ValueEntry created New ValueEntry created New ValueEntry created New ValueEntry created New ValueEntry created New ValueEntry created New ValueEntry created New ValueEntry created New ValueEntry created New ValueEntry created New ValueEntry created New ValueEntry created New ValueEntry created New ValueEntry created New ValueEntry created Published entry 0, value 1510145842 elapsed microseconds = 2205 Event handled: Value = 1510145842 (processed event 0 Published entry 1, value 1718075893 elapsed microseconds = 85 Event handled: Value = 1718075893 (processed event 1 Published entry 2, value 1675907645 elapsed microseconds = 32 Event handled: Value = 1675907645 (processed event 2 Published entry 3, value 1563009446 elapsed microseconds = 75 Event handled: Value = 1563009446 (processed event 3 Published entry 4, value 1782914062 elapsed microseconds = 34 Event handled: Value = 1782914062 (processed event 4 Published entry 5, value 1516398244 elapsed microseconds = 50 Event handled: Value = 1516398244 (processed event 5 Published entry 6, value 76829327 elapsed microseconds = 50 Event handled: Value = 76829327 (processed event 6 

因此,将数据从一个线程传递到另一个线程需要大约50微秒。 但它并不快! “Disruptor的当前版本可以在线程之间以每秒100万条消息的速度进行~50 ns。” 所以我的结果比预期慢1000倍。

我的例子有什么问题,如何实现50 ns的速度?

我已修改上面的程序,现在接收1微秒的延迟,这要好得多。 但是,我仍在等待disruptor模式专家的回应。 我正在寻找一个可以certificate我可以在50 ns内实际传递数据的例子。

我也使用BlockingCollection编写了相同的测试,平均收到了14微秒,这certificate了Disruptor更快:

使用BlockingCollection:

 average = 14 minimum = 0 0-5 = 890558, 5-10 = 1773781, 10-30 = 6900128, >30 = 435433 

使用Disruptor:

 average = 0 minimum = 0 0-5 = 9908469, 5-10 = 64464, 10-30 = 19902, >30 = 7065 

BlockingCollection代码:

 using System; using System.Collections.Concurrent; using System.Diagnostics; using System.Linq; using System.Threading; using System.Threading.Tasks; namespace DisruptorTest { public sealed class ValueEntry { public int Value { get; set; } public ValueEntry() { // Console.WriteLine("New ValueEntry created"); } } //public class ValueAdditionHandler : IEventHandler //{ // public void OnNext(ValueEntry data, long sequence, bool endOfBatch) // { // long microseconds = Program.sw[data.Value].ElapsedTicks / (Stopwatch.Frequency / (1000L * 1000L)); // Program.results[data.Value] = microseconds; // //Console.WriteLine("elapsed microseconds = " + microseconds); // //Console.WriteLine("Event handled: Value = {0} (processed event {1}", data.Value, sequence); // } //} class Program { public const int length = 10000000; public static Stopwatch[] sw = new Stopwatch[length]; public static long[] results = new long[length]; static BlockingCollection dataItems = new BlockingCollection(150); static void Main(string[] args) { for (int i = 0; i  { while (!dataItems.IsCompleted) { ValueEntry ve = null; try { ve = dataItems.Take(); long microseconds = sw[ve.Value].ElapsedTicks / (Stopwatch.Frequency / (1000L * 1000L)); results[ve.Value] = microseconds; //Console.WriteLine("elapsed microseconds = " + microseconds); //Console.WriteLine("Event handled: Value = {0} (processed event {1}", ve.Value, ve.Value); } catch (InvalidOperationException) { } } }, TaskCreationOptions.LongRunning); for (int i = 0; i < length; i++) { var valueToSet = i; ValueEntry entry = new ValueEntry(); entry.Value = valueToSet; sw[i].Restart(); dataItems.Add(entry); //Console.WriteLine("Published entry {0}, value {1}", valueToSet, entry.Value); //Thread.Sleep(1000); } // Wait until all events are delivered Thread.Sleep(5000); long average = 0; long minimum = 10000000000; int firstFive = 0; int fiveToTen = 0; int tenToThirty = 0; int moreThenThirty = 0; // Do not count first 100 items because they could be extremely slow for (int i = 100; i < length; i++) { average += results[i]; if (results[i] < minimum) { minimum = results[i]; } if (results[i] < 5) { firstFive++; } else if (results[i] < 10) { fiveToTen++; } else if (results[i] 30 = {5}", average, minimum, firstFive, fiveToTen, tenToThirty, moreThenThirty); } } } 

破坏者代码:

 using System; using System.Diagnostics; using System.Linq; using System.Threading; using System.Threading.Tasks; using Disruptor; using Disruptor.Dsl; namespace DisruptorTest { public sealed class ValueEntry { public int Value { get; set; } public ValueEntry() { // Console.WriteLine("New ValueEntry created"); } } public class ValueAdditionHandler : IEventHandler { public void OnNext(ValueEntry data, long sequence, bool endOfBatch) { long microseconds = Program.sw[data.Value].ElapsedTicks / (Stopwatch.Frequency / (1000L * 1000L)); Program.results[data.Value] = microseconds; //Console.WriteLine("elapsed microseconds = " + microseconds); //Console.WriteLine("Event handled: Value = {0} (processed event {1}", data.Value, sequence); } } class Program { public const int length = 10000000; public static Stopwatch[] sw = new Stopwatch[length]; public static long[] results = new long[length]; private static readonly Random _random = new Random(); private static readonly int _ringSize = 1024; // Must be multiple of 2 static void Main(string[] args) { for (int i = 0; i < length; i++) { sw[i] = Stopwatch.StartNew(); } var disruptor = new Disruptor.Dsl.Disruptor(() => new ValueEntry(), _ringSize, TaskScheduler.Default); disruptor.HandleEventsWith(new ValueAdditionHandler()); var ringBuffer = disruptor.Start(); for (int i = 0; i < length; i++) { var valueToSet = i; long sequenceNo = ringBuffer.Next(); ValueEntry entry = ringBuffer[sequenceNo]; entry.Value = valueToSet; sw[i].Restart(); ringBuffer.Publish(sequenceNo); //Console.WriteLine("Published entry {0}, value {1}", sequenceNo, entry.Value); //Thread.Sleep(1000); } // wait until all events are delivered Thread.Sleep(5000); long average = 0; long minimum = 10000000000; int firstFive = 0; int fiveToTen = 0; int tenToThirty = 0; int moreThenThirty = 0; // Do not count first 100 items because they could be extremely slow for (int i = 100; i < length; i++) { average += results[i]; if (results[i] < minimum) { minimum = results[i]; } if (results[i] < 5) { firstFive++; } else if (results[i] < 10) { fiveToTen++; } else if (results[i] 30 = {5}", average, minimum, firstFive, fiveToTen, tenToThirty, moreThenThirty); } } } 

在这里,我修复了你的代码:

 using System; using System.Diagnostics; using System.Linq; using System.Threading; using System.Threading.Tasks; using Disruptor; using Disruptor.Dsl; namespace DisruptorTest { public sealed class ValueEntry { public int Value { get; set; } public ValueEntry() { // Console.WriteLine("New ValueEntry created"); } } class Program { public const int length = 1000000; public static Stopwatch sw; private static readonly Random _random = new Random(); private static readonly int _ringSize = 1024; // Must be multiple of 2 static void Main(string[] args) { sw = Stopwatch.StartNew(); var disruptor = new Disruptor.Dsl.Disruptor(() => new ValueEntry(), _ringSize, TaskScheduler.Default); var ringBuffer = disruptor.Start(); for (int i = 0; i < length; i++) { var valueToSet = i; long sequenceNo = ringBuffer.Next(); ValueEntry entry = ringBuffer[sequenceNo]; entry.Value = valueToSet; ringBuffer.Publish(sequenceNo); //Console.WriteLine("Published entry {0}, value {1}", sequenceNo, entry.Value); //Thread.Sleep(1000); } var elapsed = sw.Elapsed.Miliseconds(); // wait until all events are delivered Thread.Sleep(10000); double average = /(double)length; Console.WriteLine("average = " + average); } } } 

这应该正确测试每个项目需要多长时间。