C#事件:如何以并行方式处理事件

我有一个我希望以并行方式处理的事件。 我的想法是将每个回调添加到ThreadPool,effectivley有每个注册由ThreadPool处理的事件的方法。

我的试用代码如下所示:

Delegate[] delegates = myEvent.GetInvocationList(); IAsyncResult[] results = new IAsyncResult[ delegates.Count() ]; for ( int i = 0; i < delegates.Count(); i++ ) { IAsyncResult result = ( ( TestDelegate )delegates[ i ] ).BeginInvoke( "BeginInvoke/EndInvoke", null, null ); results[ i ] = result; } for ( int i = 0; i < delegates.Length; i++ ) { ( ( TestDelegate )delegates[ i ] ).EndInvoke( results[ i ] ); } 

这只是为了玩,因为我很好奇如何做到这一点。 我相信有更好的方法可以做到这一点。 我不喜欢有一个Func创建一个持有lambda的WaitCallback。 此外,与直接调用委托相比,DynamicInvoke相当慢。 我怀疑这种处理事件的方式比按顺序执行要快。

我的问题是:如何以并行方式处理事件,最好是使用ThreadPool?

由于我通常使用Mono,.NET 4.0或任务并行库都不是一个选项。

谢谢!

编辑: – 通过Earwickers答案更正了示例。 – 更新了试用代码

我将使用DynamicMethod(LCG)和一个带有参数的状态对象,并跟踪调用(以便您可以等待它们完成)。

代码:这样的东西应该做(虽然没有通过测试,但在某些情况下可能会抛出一些讨厌的exception):

 ///  /// Class for dynamic parallel invoking of a MulticastDelegate. /// (C) 2009 Arsène von Wyss, avw@gmx.ch /// No warranties of any kind, use at your own risk. Copyright notice must be kept in the source when re-used. ///  public static class ParallelInvoke { private class ParallelInvokeContext where TDelegate: class { private static readonly DynamicMethod invoker; private static readonly Type[] parameterTypes; static ParallelInvokeContext() { if (!typeof(Delegate).IsAssignableFrom(typeof(TDelegate))) { throw new InvalidOperationException("The TDelegate type must be a delegate"); } Debug.Assert(monitor_enter != null, "Could not find the method Monitor.Enter()"); Debug.Assert(monitor_pulse != null, "Could not find the method Monitor.Pulse()"); Debug.Assert(monitor_exit != null, "Could not find the method Monitor.Exit()"); FieldInfo parallelInvokeContext_activeCalls = typeof(ParallelInvokeContext).GetField("activeCalls", BindingFlags.Instance|BindingFlags.NonPublic); Debug.Assert(parallelInvokeContext_activeCalls != null, "Could not find the private field ParallelInvokeContext.activeCalls"); FieldInfo parallelInvokeContext_arguments = typeof(ParallelInvokeContext).GetField("arguments", BindingFlags.Instance|BindingFlags.NonPublic); Debug.Assert(parallelInvokeContext_arguments != null, "Could not find the private field ParallelInvokeContext.arguments"); MethodInfo delegate_invoke = typeof(TDelegate).GetMethod("Invoke", BindingFlags.Instance|BindingFlags.Public); Debug.Assert(delegate_invoke != null, string.Format("Could not find the method {0}.Invoke()", typeof(TDelegate).FullName)); if (delegate_invoke.ReturnType != typeof(void)) { throw new InvalidOperationException("The TDelegate delegate must not have a return value"); } ParameterInfo[] parameters = delegate_invoke.GetParameters(); parameterTypes = new Type[parameters.Length]; invoker = new DynamicMethod(string.Format("Invoker<{0}>", typeof(TDelegate).FullName), typeof(void), new[] {typeof(ParallelInvokeContext), typeof(object)}, typeof(ParallelInvokeContext), true); ILGenerator il = invoker.GetILGenerator(); LocalBuilder args = (parameters.Length > 2) ? il.DeclareLocal(typeof(object[])) : null; bool skipLoad = false; il.BeginExceptionBlock(); il.Emit(OpCodes.Ldarg_1); // the delegate we are going to invoke if (args != null) { Debug.Assert(args.LocalIndex == 0); il.Emit(OpCodes.Ldarg_0); il.Emit(OpCodes.Ldfld, parallelInvokeContext_arguments); il.Emit(OpCodes.Dup); il.Emit(OpCodes.Stloc_0); skipLoad = true; } foreach (ParameterInfo parameter in parameters) { if (parameter.ParameterType.IsByRef) { throw new InvalidOperationException("The TDelegate delegate must note have out or ref parameters"); } parameterTypes[parameter.Position] = parameter.ParameterType; if (args == null) { il.Emit(OpCodes.Ldarg_0); il.Emit(OpCodes.Ldfld, parallelInvokeContext_arguments); } else if (skipLoad) { skipLoad = false; } else { il.Emit(OpCodes.Ldloc_0); } il.Emit(OpCodes.Ldc_I4, parameter.Position); il.Emit(OpCodes.Ldelem_Ref); if (parameter.ParameterType.IsValueType) { il.Emit(OpCodes.Unbox_Any, parameter.ParameterType); } } il.Emit(OpCodes.Callvirt, delegate_invoke); il.BeginFinallyBlock(); il.Emit(OpCodes.Ldarg_0); il.Emit(OpCodes.Call, monitor_enter); il.Emit(OpCodes.Ldarg_0); il.Emit(OpCodes.Dup); il.Emit(OpCodes.Ldfld, parallelInvokeContext_activeCalls); il.Emit(OpCodes.Ldc_I4_1); il.Emit(OpCodes.Sub); il.Emit(OpCodes.Dup); Label noPulse = il.DefineLabel(); il.Emit(OpCodes.Brtrue, noPulse); il.Emit(OpCodes.Stfld, parallelInvokeContext_activeCalls); il.Emit(OpCodes.Ldarg_0); il.Emit(OpCodes.Call, monitor_pulse); Label exit = il.DefineLabel(); il.Emit(OpCodes.Br, exit); il.MarkLabel(noPulse); il.Emit(OpCodes.Stfld, parallelInvokeContext_activeCalls); il.MarkLabel(exit); il.Emit(OpCodes.Ldarg_0); il.Emit(OpCodes.Call, monitor_exit); il.EndExceptionBlock(); il.Emit(OpCodes.Ret); } [Conditional("DEBUG")] private static void VerifyArgumentsDebug(object[] args) { for (int i = 0; i < parameterTypes.Length; i++) { if (args[i] == null) { if (parameterTypes[i].IsValueType) { throw new ArgumentException(string.Format("The parameter {0} cannot be null, because it is a value type", i)); } } else if (!parameterTypes[i].IsAssignableFrom(args[i].GetType())) { throw new ArgumentException(string.Format("The parameter {0} is not compatible", i)); } } } private readonly object[] arguments; private readonly WaitCallback invokeCallback; private int activeCalls; public ParallelInvokeContext(object[] args) { if (parameterTypes.Length > 0) { if (args == null) { throw new ArgumentNullException("args"); } if (args.Length != parameterTypes.Length) { throw new ArgumentException("The parameter count does not match"); } VerifyArgumentsDebug(args); arguments = args; } else if ((args != null) && (args.Length > 0)) { throw new ArgumentException("This delegate does not expect any parameters"); } invokeCallback = (WaitCallback)invoker.CreateDelegate(typeof(WaitCallback), this); } public void QueueInvoke(Delegate @delegate) { Debug.Assert(@delegate is TDelegate); activeCalls++; ThreadPool.QueueUserWorkItem(invokeCallback, @delegate); } } private static readonly MethodInfo monitor_enter; private static readonly MethodInfo monitor_exit; private static readonly MethodInfo monitor_pulse; static ParallelInvoke() { monitor_enter = typeof(Monitor).GetMethod("Enter", BindingFlags.Static|BindingFlags.Public, null, new[] {typeof(object)}, null); monitor_pulse = typeof(Monitor).GetMethod("Pulse", BindingFlags.Static|BindingFlags.Public, null, new[] {typeof(object)}, null); monitor_exit = typeof(Monitor).GetMethod("Exit", BindingFlags.Static|BindingFlags.Public, null, new[] {typeof(object)}, null); } public static void Invoke(TDelegate @delegate) where TDelegate: class { Invoke(@delegate, null); } public static void Invoke(TDelegate @delegate, params object[] args) where TDelegate: class { if (@delegate == null) { throw new ArgumentNullException("delegate"); } ParallelInvokeContext context = new ParallelInvokeContext(args); lock (context) { foreach (Delegate invocationDelegate in ((Delegate)(object)@delegate).GetInvocationList()) { context.QueueInvoke(invocationDelegate); } Monitor.Wait(context); } } } 

用法:

 ParallelInvoke.Invoke(yourDelegate, arguments); 

笔记:

  • 不处理事件处理程序中的exception(但IL代码最终会减少计数器,以便方法可以正确结束)并且这可能会导致麻烦。 也可以在IL代码中捕获和传输exception。

  • 除了inheritance之外的隐式转换(例如int到double)不会执行,并会引发exception。

  • 使用的同步技术不分配OS等待句柄,这通常有利于提高性能。 可以在Joseph Albahari的页面上找到有关Monitor工作的说明 。

  • 经过一些性能测试后,似乎这种方法比使用委托上的“本机”BeginInvoke / EndInvoke调用的任何方法(至少在MS CLR中)更好地扩展。

如果已知委托的类型,则可以直接调用其BeginInvoke并将IAsyncResults存储在数组中以等待和结束调用。 请注意,您应该调用EndInvoke以避免潜在的资源泄漏 。 代码依赖于EndInvoke等待调用完成的事实,因此不需要WaitAll(并且,请注意, WaitAll有几个问题,以便我避免使用它)。

这是一个代码示例,同时是不同方法的简单基准:

 public static class MainClass { private delegate void TestDelegate(string x); private static void A(string x) {} private static void Invoke(TestDelegate test, string s) { Delegate[] delegates = test.GetInvocationList(); IAsyncResult[] results = new IAsyncResult[delegates.Length]; for (int i = 0; i < delegates.Length; i++) { results[i] = ((TestDelegate)delegates[i]).BeginInvoke("string", null, null); } for (int i = 0; i < delegates.Length; i++) { ((TestDelegate)delegates[i]).EndInvoke(results[i]); } } public static void Main(string[] args) { Console.WriteLine("Warm-up call"); TestDelegate test = A; test += A; test += A; test += A; test += A; test += A; test += A; test += A; test += A; test += A; // 10 times in the invocation list ParallelInvoke.Invoke(test, "string"); // warm-up Stopwatch sw = new Stopwatch(); GC.Collect(); GC.WaitForPendingFinalizers(); Console.WriteLine("Profiling calls"); sw.Start(); for (int i = 0; i < 100000; i++) { // ParallelInvoke.Invoke(test, "string"); // profiling ParallelInvoke Invoke(test, "string"); // profiling native BeginInvoke/EndInvoke } sw.Stop(); Console.WriteLine("Done in {0} ms", sw.ElapsedMilliseconds); Console.ReadKey(true); } } 

在我的旧笔记本电脑上,使用BeginInvoke / EndInvoke需要95553毫秒,而使用ParallelInvoke方法(MS .NET 3.5)需要9038毫秒。 因此,与ParallelInvoke解决方案相比,此方法的扩展性不佳。

您似乎在代码段中执行了两次异步启动。

首先,在委托上调用BeginInvoke – 这会对工作项进行排队,以便线程池将执行委托。

然后在该委托中,使用QueueUserWorkItem来排队另一个工作项,以便线程池将执行真正的委托。

这意味着当您从外部委托中返回IAsyncResult(以及因此等待句柄)时,它将在第二个工作项已排队时发出完成信号,而不是在它完成执行时发出信号。

你这样做是为了表现吗?

只有当它允许你让多个硬件并行工作时,它才会起作用,并且它会花费你的流程开关开销。