在单独的线程上引发事件

我正在开发一个组件,需要处理实时馈送并以非常快的方式向听众广播数据(大约100纳秒级别的准确度,甚至比我能做到的还要少)目前我正在筹集一个事件来自我的订阅者可以订阅的代码。 但是因为在C#中,事件处理程序在引发事件的同一个线程上运行,所以引发事件的线程将被阻塞,直到所有订阅者完成事件的处理。 我无法控制订阅者的代码,因此他们可能会在事件处理程序中执行任何耗时的操作,这可能会阻塞正在广播的线程。

我该怎么办才能将数据广播给其他订阅者,但仍可以快速播放这些内容?

100 ns是一个非常艰难的目标。 我相信它会深刻理解你正在做什么以及为什么要达到这种性能。

但是,异步调用事件订阅者很容易解决。 这已经在这里得到了回答,还有谁,Jon Skeet。

foreach (MyDelegate action in multicast.GetInvocationList()) { action.BeginInvoke(...); } 

编辑:我还应该提到,您需要在实时操作系统上运行 ,以便为您的用户提供严格的性能保证。

看起来你正在寻找任务。 以下是我为我的工作编写的扩展方法,它可以异步调用一个事件,以便每个事件处理程序都在自己的线程上。 我不能对它的速度发表评论,因为这对我来说从来都不是必需的。


UPDATE

根据评论我调整它,以便只创建一个任务来调用所有订阅者

 ///  /// Extension method to safely encapsulate asynchronous event calls with checks ///  /// The event to call /// The sender of the event /// The arguments for the event /// The state information that is passed to the callback method ///  /// This method safely calls the each event handler attached to the event. This method uses  to /// asynchronously call invoke without any exception handling. As such, if any of the event handlers throw exceptions the application will /// most likely crash when the task is collected. This is an explicit decision since it is really in the hands of the event handler /// creators to make sure they handle issues that occur do to their code. There isn't really a way for the event raiser to know /// what is going on. ///  [System.Diagnostics.DebuggerStepThrough] public static void AsyncSafeInvoke( this EventHandler evnt, object sender, EventArgs args ) { // Used to make a temporary copy of the event to avoid possibility of // a race condition if the last subscriber unsubscribes // immediately after the null check and before the event is raised. EventHandler handler = evnt; if (handler != null) { // Manually calling all event handlers so that we could capture and aggregate all the // exceptions that are thrown by any of the event handlers attached to this event. var invocationList = handler.GetInvocationList(); Task.Factory.StartNew(() => { foreach (EventHandler h in invocationList) { // Explicitly not catching any exceptions. While there are several possibilities for handling these // exceptions, such as a callback, the correct place to handle the exception is in the event handler. h.Invoke(sender, args); } }); } } 

您可以在事件处理程序上使用这些简单的扩展方法:

 public static void Raise(this EventHandler handler, object sender, T e) where T : EventArgs { if (handler != null) handler(sender, e); } public static void Raise(this EventHandler handler, object sender, EventArgs e) { if (handler != null) handler(sender, e); } public static void RaiseOnDifferentThread(this EventHandler handler, object sender, T e) where T : EventArgs { if (handler != null) Task.Factory.StartNewOnDifferentThread(() => handler.Raise(sender, e)); } public static void RaiseOnDifferentThread(this EventHandler handler, object sender, EventArgs e) { if (handler != null) Task.Factory.StartNewOnDifferentThread(() => handler.Raise(sender, e)); } public static Task StartNewOnDifferentThread(this TaskFactory taskFactory, Action action) { return taskFactory.StartNew(action: action, cancellationToken: new CancellationToken()); } 

用法:

 public static Test() { myEventHandler.RaiseOnDifferentThread(null, EventArgs.Empty); } 

StartNew()是保证StartNew()实际使用不同线程所必需的,如此处所述。

我不能说这是否能够可靠地满足100ns的要求,但是这里有一个替代方案,你可以为最终用户提供一种方法,为你提供一个你可以填充的ConcurrentQueue,他们可以在一个单独的线程上收听。

 class Program { static void Main(string[] args) { var multicaster = new QueueMulticaster(); var listener1 = new Listener(); //Make a couple of listening Q objects. listener1.Listen(); multicaster.Subscribe(listener1); var listener2 = new Listener(); listener2.Listen(); multicaster.Subscribe(listener2); multicaster.Broadcast(6); //Send a 6 to both concurrent Queues. Console.ReadLine(); } } //The listeners would run on their own thread and poll the Q like crazy. class Listener : IListenToStuff { public ConcurrentQueue StuffQueue { get; set; } public void Listen() { StuffQueue = new ConcurrentQueue(); var t = new Thread(ListenAggressively); t.Start(); } void ListenAggressively() { while (true) { int val; if(StuffQueue.TryDequeue(out val)) Console.WriteLine(val); } } } //Simple class that allows you to subscribe a Queue to a broadcast event. public class QueueMulticaster { readonly List> _subscribers = new List>(); public void Subscribe(IListenToStuff subscriber) { _subscribers.Add(subscriber); } public void Broadcast(T value) { foreach (var listenToStuff in _subscribers) { listenToStuff.StuffQueue.Enqueue(value); } } } public interface IListenToStuff { ConcurrentQueue StuffQueue { get; set; } } 

由于您无法阻止对其他侦听器的处理,因此这意味着多个线程。 在侦听器上具有专用侦听线程似乎是一种合理的尝试方法,并发队列似乎是一个不错的传递机制。 在这个实现中,它只是不断轮询,但您可能使用线程信令来减少使用AutoResetEvent类的CPU负载。