如何在C#中等待单个事件,超时和取消

所以我的要求是让我的函数等待第一个实例,来自另一个类和另一个线程的event Action ,并在我的线程上处理它,允许等待被超时或CancellationToken中断。

我想创建一个我可以重用的generics函数。 我设法创造了一些(我认为)我需要的选项,但两者看起来都比我想象的要复杂得多。

用法

为了清楚serialDevice ,此函数的示例使用将如下所示,其中serialDevice在单独的线程上吐出事件:

 var eventOccurred = Helper.WaitForSingleEvent( cancellationToken, statusPacket => OnStatusPacketReceived(statusPacket), a => serialDevice.StatusPacketReceived += a, a => serialDevice.StatusPacketReceived -= a, 5000, () => serialDevice.RequestStatusPacket()); 

选项1-ManualResetEventSlim

这个选项并不错,但是对于ManualResetEventSlimDispose处理比看起来应该更糟糕。 它给了ReSharper适合我在闭包内访问修改/处理的东西,而且真的很难遵循,所以我甚至不确定它是否正确。 也许有一些我遗漏的东西可以清理它,这是我的偏好,但我不会随便看到它。 这是代码。

 public static bool WaitForSingleEvent(this CancellationToken token, Action handler, Action<Action> subscribe, Action<Action> unsubscribe, int msTimeout, Action initializer = null) { var eventOccurred = false; var eventResult = default(TEvent); var o = new object(); var slim = new ManualResetEventSlim(); Action setResult = result => { lock (o) // ensures we get the first event only { if (!eventOccurred) { eventResult = result; eventOccurred = true; // ReSharper disable AccessToModifiedClosure // ReSharper disable AccessToDisposedClosure if (slim != null) { slim.Set(); } // ReSharper restore AccessToDisposedClosure // ReSharper restore AccessToModifiedClosure } } }; subscribe(setResult); try { if (initializer != null) { initializer(); } slim.Wait(msTimeout, token); } finally // ensures unsubscription in case of exception { unsubscribe(setResult); lock(o) // ensure we don't access slim { slim.Dispose(); slim = null; } } lock (o) // ensures our variables don't get changed in middle of things { if (eventOccurred) { handler(eventResult); } return eventOccurred; } } 

选项2 – 没有WaitHandle的轮询

这里的WaitForSingleEvent函数更清晰。 我可以使用ConcurrentQueue ,因此甚至不需要锁定。 但我只是不喜欢轮询functionSleep ,我不认为这种方法有任何解决方法。 我想传入一个WaitHandle而不是一个Func来清理Sleep ,但是第二个我这样做我已经让整个Dispose混乱再次清理。

 public static bool WaitForSingleEvent(this CancellationToken token, Action handler, Action<Action> subscribe, Action<Action> unsubscribe, int msTimeout, Action initializer = null) { var q = new ConcurrentQueue(); subscribe(q.Enqueue); try { if (initializer != null) { initializer(); } token.Sleep(msTimeout, () => !q.IsEmpty); } finally // ensures unsubscription in case of exception { unsubscribe(q.Enqueue); } TEvent eventResult; var eventOccurred = q.TryDequeue(out eventResult); if (eventOccurred) { handler(eventResult); } return eventOccurred; } public static void Sleep(this CancellationToken token, int ms, Func exitCondition) { var start = DateTime.Now; while ((DateTime.Now - start).TotalMilliseconds < ms && !exitCondition()) { token.ThrowIfCancellationRequested(); Thread.Sleep(1); } } 

这个问题

我并不特别关心这些解决方案中的任何一种,也不是100%确定它们中的任何一种都是100%正确的。 这些解决方案中的任何一个比其他解决方案更好(惯用性,效率等),还是有更简单的方法或内置函数来满足我在这里需要做的事情?

更新:目前为止的最佳答案

下面对TaskCompletionSource解决方案的修改。 没有长的闭合,锁或任何需要的东西。 看起来非常简单。 这里有错误吗?

 public static bool WaitForSingleEvent(this CancellationToken token, Action onEvent, Action<Action> subscribe, Action<Action> unsubscribe, int msTimeout, Action initializer = null) { var tcs = new TaskCompletionSource(); Action handler = result => tcs.TrySetResult(result); var task = tcs.Task; subscribe(handler); try { if (initializer != null) { initializer(); } task.Wait(msTimeout, token); } finally { unsubscribe(handler); // Do not dispose task http://blogs.msdn.com/b/pfxteam/archive/2012/03/25/10287435.aspx } if (task.Status == TaskStatus.RanToCompletion) { onEvent(task.Result); return true; } return false; } 

更新2:另一个很棒的解决方案

事实certificate, BlockingCollection工作方式与ConcurrentQueue类似,但也有接受超时和取消令牌的方法。 这个解决方案的一个WaitForNEvents是它可以更新,以便轻松地创建WaitForNEvents

 public static bool WaitForSingleEvent(this CancellationToken token, Action handler, Action<Action> subscribe, Action<Action> unsubscribe, int msTimeout, Action initializer = null) { var q = new BlockingCollection(); Action add = item => q.TryAdd(item); subscribe(add); try { if (initializer != null) { initializer(); } TEvent eventResult; if (q.TryTake(out eventResult, msTimeout, token)) { handler(eventResult); return true; } return false; } finally { unsubscribe(add); q.Dispose(); } } 

您可以使用Rx将事件转换为可观察事件,然后转换为任务,最后使用令牌/超时等待该任务。

这比任何现有解决方案的一个优点是,它调用unsubscribe事件的线程, 确保您的处理程序不会被调用两次。 (在你的第一个解决方案中,你通过tcs.TrySetResult而不是tcs.SetResult解决这个tcs.SetResult ,但是总是很好的摆脱“TryDoSomething”并简单地确保DoSomething始终有效)。

另一个优点是代码的简单性。 它基本上是一行。 所以你甚至不需要一个独立的function。 您可以内联它,以便更清楚您的代码究竟是什么,并且您可以在不需要大量可选参数的情况下对主题进行更改(例如您的可选initializer ,或允许等待N个事件,或者在实例中进行超时/取消)他们没有必要的地方)。 如果它完全有用的话,你可以在完成后获得bool返回值范围内的实际result

 using System.Reactive.Linq; using System.Reactive.Threading.Tasks; ... public static bool WaitForSingleEvent(this CancellationToken token, Action onEvent, Action> subscribe, Action> unsubscribe, int msTimeout, Action initializer = null) { var task = Observable.FromEvent(subscribe, unsubscribe).FirstAsync().ToTask(); if (initializer != null) { initializer(); } try { var finished = task.Wait(msTimeout, token); if (finished) onEvent(task.Result); return finished; } catch (OperationCanceledException) { return false; } } 

您可以使用TaskCompletetionSource创建一个可以标记为已完成或已取消的任务。 以下是特定事件的可能实现:

 public Task WaitFirstMyEvent(Foo target, CancellationToken cancellationToken) { var tcs = new TaskCompletionSource(); Action handler = null; var registration = cancellationToken.Register(() => { target.MyEvent -= handler; tcs.TrySetCanceled(); }); handler = () => { target.MyEvent -= handler; registration.Dispose(); tcs.TrySetResult(null); }; target.MyEvent += handler; return tcs.Task; } 

在C#5中,您可以像这样使用它:

 private async Task MyMethod() { ... await WaitFirstMyEvent(foo, cancellationToken); ... } 

如果要同步等待事件,还可以使用Wait方法:

 private void MyMethod() { ... WaitFirstMyEvent(foo, cancellationToken).Wait(); ... } 

这是一个更通用的版本,但它仍然只适用于具有Action签名的事件:

 public Task WaitFirstEvent( Action subscribe, Action unsubscribe, CancellationToken cancellationToken) { var tcs = new TaskCompletionSource(); Action handler = null; var registration = cancellationToken.Register(() => { unsubscribe(handler); tcs.TrySetCanceled(); }); handler = () => { unsubscribe(handler); registration.Dispose(); tcs.TrySetResult(null); }; subscribe(handler); return tcs.Task; } 

你可以像这样使用它:

 await WaitFirstEvent( handler => foo.MyEvent += handler, handler => foo.MyEvent -= handler, cancellationToken); 

如果您希望它与其他事件签名(例如EventHandler )一起使用,则必须创建单独的重载。 我不认为有一种简单的方法可以使它适用于任何签名,特别是因为参数的数量并不总是相同的。