如何避免使用异步void事件处理程序重入?

在WPF应用程序中,我有一个通过网络接收消息的类。 每当所述类的对象收到完整消息时,就会引发一个事件。 在应用程序的MainWindow中,我有一个订阅该事件的事件处理程序。 保证在应用程序的GUI线程上调用事件处理程序。

每当调用事件处理程序时,都需要将消息的内容应用于模型。 这样做可能非常昂贵(在当前硬件上> 200ms)。 这就是使用Task.Run将应用消息卸载到线程池的原因。

现在,可以非常接近地接收消息,因此可以在仍在处理先前的更改时调用事件处理程序。 确保消息仅在一次应用时最简单的方法是什么? 到目前为止,我已经提出以下建议:

using System; using System.Threading.Tasks; using System.Windows; public partial class MainWindow : Window { private Model model = new Model(); private Task pending = Task.FromResult(false); // Assume e carries a message received over the network. private void OnMessageReceived(object sender, EventArgs e) { this.pending = ApplyToModel(e); } private async Task ApplyToModel(EventArgs e) { await this.pending; await Task.Run(() => this.model.Apply(e)); // Assume this is an expensive call. } } 

这似乎按预期工作,但是它似乎也会不可避免地产生“内存泄漏”,因为应用消息的任务将始终首先等待应用前一个消息的任务。 如果是这样,那么以下更改应该避免泄漏:

 private async Task ApplyToModel(EventArgs e) { if (!this.pending.IsCompleted) { await this.pending; } await Task.Run(() => this.model.Apply(e)); } 

这是避免使用异步void事件处理程序重入的一种明智方法吗?

编辑 :删除了不必要的await this.pending; OnMessageReceived语句。

编辑2 :消息必须按照收到的顺序应用于模型。

我们需要感谢Stephen Toub,因为他在博客系列中展示了一些非常有用的异步锁定结构,包括异步锁定块。

以下是该文章的代码(包括本系列前一篇文章中的一些代码):

 public class AsyncLock { private readonly AsyncSemaphore m_semaphore; private readonly Task m_releaser; public AsyncLock() { m_semaphore = new AsyncSemaphore(1); m_releaser = Task.FromResult(new Releaser(this)); } public Task LockAsync() { var wait = m_semaphore.WaitAsync(); return wait.IsCompleted ? m_releaser : wait.ContinueWith((_, state) => new Releaser((AsyncLock)state), this, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); } public struct Releaser : IDisposable { private readonly AsyncLock m_toRelease; internal Releaser(AsyncLock toRelease) { m_toRelease = toRelease; } public void Dispose() { if (m_toRelease != null) m_toRelease.m_semaphore.Release(); } } } public class AsyncSemaphore { private readonly static Task s_completed = Task.FromResult(true); private readonly Queue> m_waiters = new Queue>(); private int m_currentCount; public AsyncSemaphore(int initialCount) { if (initialCount < 0) throw new ArgumentOutOfRangeException("initialCount"); m_currentCount = initialCount; } public Task WaitAsync() { lock (m_waiters) { if (m_currentCount > 0) { --m_currentCount; return s_completed; } else { var waiter = new TaskCompletionSource(); m_waiters.Enqueue(waiter); return waiter.Task; } } } public void Release() { TaskCompletionSource toRelease = null; lock (m_waiters) { if (m_waiters.Count > 0) toRelease = m_waiters.Dequeue(); else ++m_currentCount; } if (toRelease != null) toRelease.SetResult(true); } } 

现在将它应用于您的案例:

 private readonly AsyncLock m_lock = new AsyncLock(); private async void OnMessageReceived(object sender, EventArgs e) { using(var releaser = await m_lock.LockAsync()) { await Task.Run(() => this.model.Apply(e)); } } 

给定一个使用异步等待的事件处理程序,我们不能在Task外部使用锁,因为调用线程对于每个事件调用都是相同的,因此锁将始终让它通过。

 var object m_LockObject = new Object(); private async void OnMessageReceived(object sender, EventArgs e) { // Does not work Monitor.Enter(m_LockObject); await Task.Run(() => this.model.Apply(e)); Monitor.Exit(m_LockObject); } 

但是我们可以在Task中锁定,因为Task.Run总是生成一个新的Task,它不会在同一个线程上并行运行

 var object m_LockObject = new Object(); private async void OnMessageReceived(object sender, EventArgs e) { await Task.Run(() => { // Does work lock(m_LockObject) { this.model.Apply(e); } }); } 

因此,当一个事件调用OnMessageReceived时,它将返回immidiatly和model.Apply仅一个接一个地输入。