异步/等待,自定义awaiter和垃圾收集器

我正在处理托管对象在async方法中过早完成的情况。

这是一个业余爱好的家庭自动化项目(Windows 8.1,.NET 4.5.1),我向非托管第三方DLL提供C#回调。 在某个传感器事件时调用回调。

要处理事件,我使用async/await和一个简单的自定义awaiter(而不是TaskCompletionSource )。 我这样做的部分原因是为了减少不必要的分配数量,但主要是出于好奇心作为学习练习。

下面是我所拥有的非常剥离的版本,使用Win32计时器队列计时器来模拟非托管事件源。 让我们从输出开始:

按Enter退出...
服务员()
打勾:0
打勾:1
 〜Awaiter()
打勾:2
打勾:3
打勾:4

请注意我的等待者在第二次打勾后如何最终确定。 这是出乎意料的。

代码(控制台应用程序):

 using System; using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; namespace ConsoleApplication { class Program { static async Task TestAsync() { var awaiter = new Awaiter(); //var hold = GCHandle.Alloc(awaiter); WaitOrTimerCallbackProc callback = (a, b) => awaiter.Continue(); IntPtr timerHandle; if (!CreateTimerQueueTimer(out timerHandle, IntPtr.Zero, callback, IntPtr.Zero, 500, 500, 0)) throw new System.ComponentModel.Win32Exception( Marshal.GetLastWin32Error()); var i = 0; while (true) { await awaiter; Console.WriteLine("tick: " + i++); } } static void Main(string[] args) { Console.WriteLine("Press Enter to exit..."); var task = TestAsync(); Thread.Sleep(1000); GC.Collect(GC.MaxGeneration, GCCollectionMode.Forced); Console.ReadLine(); } // custom awaiter public class Awaiter : System.Runtime.CompilerServices.INotifyCompletion { Action _continuation; public Awaiter() { Console.WriteLine("Awaiter()"); } ~Awaiter() { Console.WriteLine("~Awaiter()"); } // resume after await, called upon external event public void Continue() { var continuation = Interlocked.Exchange(ref _continuation, null); if (continuation != null) continuation(); } // custom Awaiter methods public Awaiter GetAwaiter() { return this; } public bool IsCompleted { get { return false; } } public void GetResult() { } // INotifyCompletion public void OnCompleted(Action continuation) { Volatile.Write(ref _continuation, continuation); } } // p/invoke delegate void WaitOrTimerCallbackProc(IntPtr lpParameter, bool TimerOrWaitFired); [DllImport("kernel32.dll")] static extern bool CreateTimerQueueTimer(out IntPtr phNewTimer, IntPtr TimerQueue, WaitOrTimerCallbackProc Callback, IntPtr Parameter, uint DueTime, uint Period, uint Flags); } } 

我设法用这条线压制了awaiter的集合:

 var hold = GCHandle.Alloc(awaiter); 

但是,我不完全理解为什么我必须创建这样的强引用。 awaiter在无限循环内引用。 AFAICT,在TestAsync返回的任务完成(取消/故障)之前,它不会超出范围。 并且任务本身永远在Main内引用。

最后,我将TestAsync为:

 static async Task TestAsync() { var awaiter = new Awaiter(); //var hold = GCHandle.Alloc(awaiter); var i = 0; while (true) { await awaiter; Console.WriteLine("tick: " + i++); } } 

collections仍然发生。 我怀疑整个编译器生成的状态机对象正在收集。 有人可以解释为什么会这样吗?

现在,通过以下小修改, awaiter不再被垃圾收集:

 static async Task TestAsync() { var awaiter = new Awaiter(); //var hold = GCHandle.Alloc(awaiter); var i = 0; while (true) { //await awaiter; await Task.Delay(500); Console.WriteLine("tick: " + i++); } } 

更新后 , 这个小提琴显示了awaiter对象如何在没有任何p / invoke代码的情况下进行垃圾收集。 我认为,原因可能是在生成的状态机对象的初始状态之外 没有awaiter 外部引用。 我需要研究编译器生成的代码。


更新了 ,这是编译器生成的代码(对于这个小提琴 ,VS2012)。 显然, stateMachine.t__builder.Task返回的Task不会保留对状态机本身( stateMachine )的引用(或者更确切地说是副本)。 我错过了什么吗?

  private static Task TestAsync() { Program.TestAsyncd__0 stateMachine; stateMachine.t__builder = AsyncTaskMethodBuilder.Create(); stateMachine.1__state = -1; stateMachine.t__builder.Start(ref stateMachine); return stateMachine.t__builder.Task; } [CompilerGenerated] [StructLayout(LayoutKind.Auto)] private struct TestAsyncd__0 : IAsyncStateMachine { public int 1__state; public AsyncTaskMethodBuilder t__builder; public Program.Awaiter awaiter5__1; public int i5__2; private object u__awaiter3; private object t__stack; void IAsyncStateMachine.MoveNext() { try { bool flag = true; Program.Awaiter awaiter; switch (this.1__state) { case -3: goto label_7; case 0: awaiter = (Program.Awaiter) this.u__awaiter3; this.u__awaiter3 = (object) null; this.1__state = -1; break; default: this.awaiter5__1 = new Program.Awaiter(); this.i5__2 = 0; goto label_5; } label_4: awaiter.GetResult(); Console.WriteLine("tick: " + (object) this.i5__2++); label_5: awaiter = this.awaiter5__1.GetAwaiter(); if (!awaiter.IsCompleted) { this.1__state = 0; this.u__awaiter3 = (object) awaiter; this.t__builder.AwaitOnCompleted(ref awaiter, ref this); flag = false; return; } else goto label_4; } catch (Exception ex) { this.1__state = -2; this.t__builder.SetException(ex); return; } label_7: this.1__state = -2; this.t__builder.SetResult(); } [DebuggerHidden] void IAsyncStateMachine.SetStateMachine(IAsyncStateMachine param0) { this.t__builder.SetStateMachine(param0); } } 

我已经删除了所有p / invoke的东西,并重新创建了编译器生成的状态机逻辑的简化版本。 它表现出相同的行为:在第一次调用状态机的MoveNext方法之后, awaiter会收集gaaraage。

Microsoft最近在为其.NET参考源提供Web UI方面做得非常出色,这非常有用。 在研究了AsyncTaskMethodBuilder的实现,最重要的是AsyncMethodBuilderCore.GetCompletionAction ,我现在相信我所看到的GC行为非常有意义 。 我将在下面解释一下。

代码:

 using System; using System.Threading; using System.Threading.Tasks; using System.Runtime.InteropServices; using System.Runtime.CompilerServices; namespace ConsoleApplication { public class Program { // Original version with async/await /* static async Task TestAsync() { Console.WriteLine("Enter TestAsync"); var awaiter = new Awaiter(); //var hold = GCHandle.Alloc(awaiter); var i = 0; while (true) { await awaiter; Console.WriteLine("tick: " + i++); } Console.WriteLine("Exit TestAsync"); } */ // Manually coded state machine version struct StateMachine: IAsyncStateMachine { public int _state; public Awaiter _awaiter; public AsyncTaskMethodBuilder _builder; public void MoveNext() { Console.WriteLine("StateMachine.MoveNext, state: " + this._state); switch (this._state) { case -1: { this._awaiter = new Awaiter(); goto case 0; }; case 0: { this._state = 0; var awaiter = this._awaiter; this._builder.AwaitOnCompleted(ref awaiter, ref this); return; }; default: throw new InvalidOperationException(); } } public void SetStateMachine(IAsyncStateMachine stateMachine) { Console.WriteLine("StateMachine.SetStateMachine, state: " + this._state); this._builder.SetStateMachine(stateMachine); // s_strongRef = stateMachine; } static object s_strongRef = null; } static Task TestAsync() { StateMachine stateMachine = new StateMachine(); stateMachine._state = -1; stateMachine._builder = AsyncTaskMethodBuilder.Create(); stateMachine._builder.Start(ref stateMachine); return stateMachine._builder.Task; } public static void Main(string[] args) { var task = TestAsync(); Thread.Sleep(1000); GC.Collect(GC.MaxGeneration, GCCollectionMode.Forced); Console.WriteLine("Press Enter to exit..."); Console.ReadLine(); } // custom awaiter public class Awaiter : System.Runtime.CompilerServices.INotifyCompletion { Action _continuation; public Awaiter() { Console.WriteLine("Awaiter()"); } ~Awaiter() { Console.WriteLine("~Awaiter()"); } // resume after await, called upon external event public void Continue() { var continuation = Interlocked.Exchange(ref _continuation, null); if (continuation != null) continuation(); } // custom Awaiter methods public Awaiter GetAwaiter() { return this; } public bool IsCompleted { get { return false; } } public void GetResult() { } // INotifyCompletion public void OnCompleted(Action continuation) { Console.WriteLine("Awaiter.OnCompleted"); Volatile.Write(ref _continuation, continuation); } } } } 

编译器生成的状态机是一个可变结构,由ref传递。 显然,这是一个优化,以避免额外的分配。

其中核心部分发生在AsyncMethodBuilderCore.GetCompletionAction ,其中当前状态机结构被加框,并且对盒装副本的引用由传递给INotifyCompletion.OnCompleted的延续回调INotifyCompletion.OnCompleted

这是对状态机的唯一引用,它有机会站在GC并在await之后继续存在。 TestAsync返回的Task对象保存对它的引用,只有await continuation回调。 我相信这是故意的,以保持高效的GC行为。

注意注释行:

 // s_strongRef = stateMachine; 

如果我不对它进行评论,状态机的盒装副本就不会得到awaiter ,并且awaiter它作为其中的一部分保持活着。 当然,这不是解决方案,但它说明了问题。

所以,我得出以下结论。 虽然异步操作处于“正在进行中”并且状态机的状态( MoveNext )当前都没有被执行,但是继续回调的“守护者”的责任是强制保留回调本身,确保状态机的盒装副本不会被垃圾收集。

例如,在YieldAwaitable (由Task.Yield返回)的情况下,作为ThreadPool.QueueUserWorkItem调用的结果, ThreadPool.QueueUserWorkItem任务调度程序保留对continuation回调的外部引用。 对于Task.GetAwaiter ,它由任务对象间接引用 。

在我的例子中,延续回调的“守护者”是Awaiter本身。

因此,只要CLR不知道(在状态机对象之外)继续回调的外部引用,自定义等待者应该采取措施使回调对象保持活动状态。 反过来,这将使整个状态机保持活力。 在这种情况下,以下步骤是必要的:

  1. GCHandle.Alloc上调用回调上的INotifyCompletion.OnCompleted
  2. 在异步事件实际发生之前调用GCHandle.Free ,然后再调用continuation回调。
  3. 如果事件从未发生GCHandle.Free则实现IDispose以调用GCHandle.Free

鉴于此,下面是原始计时器回调代码的一个版本,它可以正常工作。 注意,没有必要强制保持定时器回调委托( WaitOrTimerCallbackProc callback )。 它作为状态机的一部分保持活着。 更新 :正如@svick所指出的,此语句可能特定于状态机的当前实现(C#5.0)。 我添加了GC.KeepAlive(callback)以消除对此行为的任何依赖,以防它在将来的编译器版本中发生更改。

 using System; using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; namespace ConsoleApplication { class Program { // Test task static async Task TestAsync(CancellationToken token) { using (var awaiter = new Awaiter()) { WaitOrTimerCallbackProc callback = (a, b) => awaiter.Continue(); try { IntPtr timerHandle; if (!CreateTimerQueueTimer(out timerHandle, IntPtr.Zero, callback, IntPtr.Zero, 500, 500, 0)) throw new System.ComponentModel.Win32Exception( Marshal.GetLastWin32Error()); try { var i = 0; while (true) { token.ThrowIfCancellationRequested(); await awaiter; Console.WriteLine("tick: " + i++); } } finally { DeleteTimerQueueTimer(IntPtr.Zero, timerHandle, IntPtr.Zero); } } finally { // reference the callback at the end // to avoid a chance for it to be GC'ed GC.KeepAlive(callback); } } } // Entry point static void Main(string[] args) { // cancel in 3s var testTask = TestAsync(new CancellationTokenSource(10 * 1000).Token); Thread.Sleep(1000); GC.Collect(GC.MaxGeneration, GCCollectionMode.Forced, true); Thread.Sleep(2000); Console.WriteLine("Press Enter to GC..."); Console.ReadLine(); GC.Collect(GC.MaxGeneration, GCCollectionMode.Forced); Console.WriteLine("Press Enter to exit..."); Console.ReadLine(); } // Custom awaiter public class Awaiter : System.Runtime.CompilerServices.INotifyCompletion, IDisposable { Action _continuation; GCHandle _hold = new GCHandle(); public Awaiter() { Console.WriteLine("Awaiter()"); } ~Awaiter() { Console.WriteLine("~Awaiter()"); } void ReleaseHold() { if (_hold.IsAllocated) _hold.Free(); } // resume after await, called upon external event public void Continue() { Action continuation; // it's OK to use lock (this) // the C# compiler would never do this, // because it's slated to work with struct awaiters lock (this) { continuation = _continuation; _continuation = null; ReleaseHold(); } if (continuation != null) continuation(); } // custom Awaiter methods public Awaiter GetAwaiter() { return this; } public bool IsCompleted { get { return false; } } public void GetResult() { } // INotifyCompletion public void OnCompleted(Action continuation) { lock (this) { ReleaseHold(); _continuation = continuation; _hold = GCHandle.Alloc(_continuation); } } // IDispose public void Dispose() { lock (this) { _continuation = null; ReleaseHold(); } } } // p/invoke delegate void WaitOrTimerCallbackProc(IntPtr lpParameter, bool TimerOrWaitFired); [DllImport("kernel32.dll")] static extern bool CreateTimerQueueTimer(out IntPtr phNewTimer, IntPtr TimerQueue, WaitOrTimerCallbackProc Callback, IntPtr Parameter, uint DueTime, uint Period, uint Flags); [DllImport("kernel32.dll")] static extern bool DeleteTimerQueueTimer(IntPtr TimerQueue, IntPtr Timer, IntPtr CompletionEvent); } }