Disruptor示例,包含1个发布者和4个并行使用者

在这个例子中https://stackoverflow.com/a/9980346/93647和这里为什么我的破坏者示例如此之慢? (在问题的最后)有1个发布者发布项目和1个消费者。

但就我而言,消费者工作要复杂得多,需要一些时间。 所以我想要4个并行处理数据的消费者。

因此,例如,如果生产者产生数字:1,2,3,4,5,6,7,8,9,10,11 ..

我希望consumer1能够捕获1,5,9,… consumer2捕获2,6,10,… consumer3捕获3,7,11,… consumer4捕获4,8,12 ……(不完全是这些数字,这个想法是数据应该并行处理,我不关心在哪个消费者处理哪个数字)

并且记住这需要并行完成,因为在实际应用中,消费者工作非常昂贵。 我希望消费者能够在不同的线程中执行以使用多核系统的function。

当然我可以创建4个环形缓冲区并将1个消费者连接到1个环形缓冲区。 这样我就可以使用原始示例了。 但我觉得这不正确。 可能创建1个发布者(1个铃声缓冲器)和4个消费者是正确的 – 因为这是我需要的。

添加指向Google群组中非常类似问题的链接: https ://groups.google.com/forum/#!msg / lmax-disruptor / -CLapWuwWLU / GHEP4UkxrAEJ

所以我们有两个选择:

  • 一个环很多消费者(每个消费者会在每次添加时“唤醒”,所有消费者应该拥有相同的WaitStrategy)
  • 许多“一个环 – 一个消费者”(每个消费者只会在它应该处理的数据上醒来。每个消费者都可以拥有自己的WaitStrategy)。

编辑 :我忘了提到代码部分取自FAQ 。 我不知道这种方法是否比弗兰克的建议更好或更差。

该项目严重记录在案,这看起来很不错,这是一种耻辱。
无论如何尝试以下剪辑(基于您的第一个链接) – 在单声道上测试,似乎没问题:

using System; using System.Threading.Tasks; using Disruptor; using Disruptor.Dsl; namespace DisruptorTest { public sealed class ValueEntry { public long Value { get; set; } } public class MyHandler : IEventHandler { private static int _consumers = 0; private readonly int _ordinal; public MyHandler() { this._ordinal = _consumers++; } public void OnNext(ValueEntry data, long sequence, bool endOfBatch) { if ((sequence % _consumers) == _ordinal) Console.WriteLine("Event handled: Value = {0}, event {1} processed by {2}", data.Value, sequence, _ordinal); else Console.WriteLine("Event {0} rejected by {1}", sequence, _ordinal); } } class Program { private static readonly Random _random = new Random(); private const int SIZE = 16; // Must be multiple of 2 private const int WORKERS = 4; static void Main() { var disruptor = new Disruptor.Dsl.Disruptor(() => new ValueEntry(), SIZE, TaskScheduler.Default); for (int i=0; i < WORKERS; i++) disruptor.HandleEventsWith(new MyHandler()); var ringBuffer = disruptor.Start(); while (true) { long sequenceNo = ringBuffer.Next(); ringBuffer[sequenceNo].Value = _random.Next();; ringBuffer.Publish(sequenceNo); Console.WriteLine("Published entry {0}, value {1}", sequenceNo, ringBuffer[sequenceNo].Value); Console.ReadKey(); } } } } 

根据环形缓冲区的规格,您将看到每个消费者都会尝试处理您的ValueEvent 。 在你的情况下,你不需要。

我这样解决了:

添加一个处理到ValueEvent的字段,当消费者接受他在该字段上测试的事件时,如果已经处理,则他继续ValueEvent到下一个字段。

不是最漂亮的方式,但它是缓冲区的工作方式。