0MQ:如何以线程安全的方式使用ZeroMQ?

我阅读了ZeroMq指南 ,我偶然发现了以下内容:

您不能在线程之间共享ØMQ套接字。 ØMQ套接字不是线程安全的。 从技术上讲,它可以做到这一点,但它需要信号量,锁或互斥量。 这将使您的应用程序变得缓慢而脆弱。 在线程之间共享套接字远程理解的唯一地方是语言绑定,需要像套接字上的垃圾收集那样做魔术。

后来:

切记: 除了创建它们的线程外,不要使用或关闭套接字。

我也明白ZeroMQ Context是线程安全的。

如果一个类在.Net中注册另一个类的事件,则可以从与创建监听器的线程不同的线程调用此事件。

我认为只有两个选项可以从事件处理程序中通过ZeroMQ-Sockets调度:

  • 将eventhandler-invoking-thread同步到创建ZeroMQ- Socket的线程
  • 通过使用线程安全的ZeroMQ- Context为事件处理程序中的线程创建一个新的ZeroMQ- Socket /获取现有的ZeroMQ- Socket

似乎0MQ-Guide不鼓励第一个,我不认为为每个线程创建一个新的ZeroMq-Socket是高性能/可行的方式。

我的问题
在事件处理程序中通过0MQ发布消息的正确模式(它的意图是什么)是什么?

此外,该指南的作者在撰写时还考虑了.Net的ZeroMQ-Binding:

在线程之间共享套接字远程理解的唯一地方是语言绑定,需要像套接字上的垃圾收集那样做魔术。 ?

这里有一些示例代码来强调我的问题/问题:

 public class ExampleClass { public event EventHandler SomethinIsCalledFromAnotherThread; } public class ByteEventArgs : EventArgs { public byte[] BytesToSend; } public class Dispatcher { ZMQ.Context ctx; public Dispatcher(ZMQ.Context mqcontext, ExampleClass exampleClassInstance) { this.ctx = mqcontext; exampleClassInstance.SomethinIsCalledFromAnotherThread += new EventHandler(exampleClass_SomethinIsCalledFromAnotherThread); } void exampleClass_SomethinIsCalledFromAnotherThread(object sender, ByteEventArgs e) { // this method might be called by a different thread. So I have to get a new socket etc? using (var socket = ctx.Socket(ZMQ.SocketType.PUSH)) { // init socket etc..... and finally: socket.Send(e.BytesToSend); } // isn't that too much overhead? } } 

在.net framework v4及更高版本中,您可以使用并发集合来解决此问题。 即生产者 – 消费者模式。 多个线程(处理程序)可以将数据发送到线程安全队列,只有单个线程使用队列中的数据并使用套接字发送它。

这是一个想法:

 sendQueue = new BlockingCollection(new ConcurrentQueue()); // concurrent queue can accept from multiple threads/handlers safely MyHandler += (MyStuff stuffToSend) => sendQueue.Add(stuffToSend); // start single-threaded data send loop Task.Factory.StartNew(() => { using(var socket = context.Socket()) { MyStuff stuffToSend; // this enumerable will be blocking until CompleteAdding is called foreach(var stuff in sendQueue.GetConsumingEnumerable()) socket.Send(stuff.Serialize()); } }); // break out of the send loop when done OnMyAppExit += sendQueue.CompleteAdding; 

您可以创建许多0MQ套接字,当然可以创建多个线程。 如果在一个线程中创建套接字,并在另一个线程中使用它,则必须在两个操作之间执行完整的内存屏障。 任何其他东西都会导致libzmq出现奇怪的随机故障,因为套接字对象不是线程安全的。

有一些传统模式,但我不知道这些模式是如何专门针对.NET的:

  1. 在使用它们的线程中创建套接字,句点。 在紧密绑定到一个进程的线程之间共享上下文,并在未严格绑定的线程中创建单独的内容。 在高级C API(czmq)中,这些称为附加和分离线程。
  2. 在父线程中创建套接字,并在线程创建时将其传递给附加线程。 线程创建调用将执行完整的内存屏障。 从那时起, 在子线程中使用套接字。 “use”表示recv,send,setsockopt,getsockopt和close。
  3. 在一个线程中创建一个套接字,并在另一个线程中使用,在每次使用之间执行自己的完整内存屏障。 这是非常微妙的,如果你不知道什么是“完全记忆障碍”,你不应该这样做。

别忘了看一下inproc运输。 使用inproc://套接字进行线程间通信可能很有用,并且有一个线程可以打开套接字与其他进程/服务器通信。

每个线程仍然需要至少一个套接字,但inproc的套接字根本不涉及IP网络层。