异步/等待,自定义awaiter和垃圾收集器
我正在处理托管对象在async
方法中过早完成的情况。
这是一个业余爱好的家庭自动化项目(Windows 8.1,.NET 4.5.1),我向非托管第三方DLL提供C#回调。 在某个传感器事件时调用回调。
要处理事件,我使用async/await
和一个简单的自定义awaiter(而不是TaskCompletionSource
)。 我这样做的部分原因是为了减少不必要的分配数量,但主要是出于好奇心作为学习练习。
下面是我所拥有的非常剥离的版本,使用Win32计时器队列计时器来模拟非托管事件源。 让我们从输出开始:
按Enter退出... 服务员() 打勾:0 打勾:1 〜Awaiter() 打勾:2 打勾:3 打勾:4
请注意我的等待者在第二次打勾后如何最终确定。 这是出乎意料的。
代码(控制台应用程序):
using System; using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; namespace ConsoleApplication { class Program { static async Task TestAsync() { var awaiter = new Awaiter(); //var hold = GCHandle.Alloc(awaiter); WaitOrTimerCallbackProc callback = (a, b) => awaiter.Continue(); IntPtr timerHandle; if (!CreateTimerQueueTimer(out timerHandle, IntPtr.Zero, callback, IntPtr.Zero, 500, 500, 0)) throw new System.ComponentModel.Win32Exception( Marshal.GetLastWin32Error()); var i = 0; while (true) { await awaiter; Console.WriteLine("tick: " + i++); } } static void Main(string[] args) { Console.WriteLine("Press Enter to exit..."); var task = TestAsync(); Thread.Sleep(1000); GC.Collect(GC.MaxGeneration, GCCollectionMode.Forced); Console.ReadLine(); } // custom awaiter public class Awaiter : System.Runtime.CompilerServices.INotifyCompletion { Action _continuation; public Awaiter() { Console.WriteLine("Awaiter()"); } ~Awaiter() { Console.WriteLine("~Awaiter()"); } // resume after await, called upon external event public void Continue() { var continuation = Interlocked.Exchange(ref _continuation, null); if (continuation != null) continuation(); } // custom Awaiter methods public Awaiter GetAwaiter() { return this; } public bool IsCompleted { get { return false; } } public void GetResult() { } // INotifyCompletion public void OnCompleted(Action continuation) { Volatile.Write(ref _continuation, continuation); } } // p/invoke delegate void WaitOrTimerCallbackProc(IntPtr lpParameter, bool TimerOrWaitFired); [DllImport("kernel32.dll")] static extern bool CreateTimerQueueTimer(out IntPtr phNewTimer, IntPtr TimerQueue, WaitOrTimerCallbackProc Callback, IntPtr Parameter, uint DueTime, uint Period, uint Flags); } }
我设法用这条线压制了awaiter
的集合:
var hold = GCHandle.Alloc(awaiter);
但是,我不完全理解为什么我必须创建这样的强引用。 awaiter
在无限循环内引用。 AFAICT,在TestAsync
返回的任务完成(取消/故障)之前,它不会超出范围。 并且任务本身永远在Main
内引用。
最后,我将TestAsync
为:
static async Task TestAsync() { var awaiter = new Awaiter(); //var hold = GCHandle.Alloc(awaiter); var i = 0; while (true) { await awaiter; Console.WriteLine("tick: " + i++); } }
collections仍然发生。 我怀疑整个编译器生成的状态机对象正在收集。 有人可以解释为什么会这样吗?
现在,通过以下小修改, awaiter
不再被垃圾收集:
static async Task TestAsync() { var awaiter = new Awaiter(); //var hold = GCHandle.Alloc(awaiter); var i = 0; while (true) { //await awaiter; await Task.Delay(500); Console.WriteLine("tick: " + i++); } }
更新后 , 这个小提琴显示了awaiter
对象如何在没有任何p / invoke代码的情况下进行垃圾收集。 我认为,原因可能是在生成的状态机对象的初始状态之外 没有对awaiter
外部引用。 我需要研究编译器生成的代码。
更新了 ,这是编译器生成的代码(对于这个小提琴 ,VS2012)。 显然, stateMachine.t__builder.Task
返回的Task
不会保留对状态机本身( stateMachine
)的引用(或者更确切地说是副本)。 我错过了什么吗?
private static Task TestAsync() { Program.TestAsyncd__0 stateMachine; stateMachine.t__builder = AsyncTaskMethodBuilder.Create(); stateMachine.1__state = -1; stateMachine.t__builder.Start(ref stateMachine); return stateMachine.t__builder.Task; } [CompilerGenerated] [StructLayout(LayoutKind.Auto)] private struct TestAsyncd__0 : IAsyncStateMachine { public int 1__state; public AsyncTaskMethodBuilder t__builder; public Program.Awaiter awaiter5__1; public int i5__2; private object u__awaiter3; private object t__stack; void IAsyncStateMachine.MoveNext() { try { bool flag = true; Program.Awaiter awaiter; switch (this.1__state) { case -3: goto label_7; case 0: awaiter = (Program.Awaiter) this.u__awaiter3; this.u__awaiter3 = (object) null; this.1__state = -1; break; default: this.awaiter5__1 = new Program.Awaiter(); this.i5__2 = 0; goto label_5; } label_4: awaiter.GetResult(); Console.WriteLine("tick: " + (object) this.i5__2++); label_5: awaiter = this.awaiter5__1.GetAwaiter(); if (!awaiter.IsCompleted) { this.1__state = 0; this.u__awaiter3 = (object) awaiter; this.t__builder.AwaitOnCompleted(ref awaiter, ref this); flag = false; return; } else goto label_4; } catch (Exception ex) { this.1__state = -2; this.t__builder.SetException(ex); return; } label_7: this.1__state = -2; this.t__builder.SetResult(); } [DebuggerHidden] void IAsyncStateMachine.SetStateMachine(IAsyncStateMachine param0) { this.t__builder.SetStateMachine(param0); } }
我已经删除了所有p / invoke的东西,并重新创建了编译器生成的状态机逻辑的简化版本。 它表现出相同的行为:在第一次调用状态机的MoveNext
方法之后, awaiter
会收集gaaraage。
Microsoft最近在为其.NET参考源提供Web UI方面做得非常出色,这非常有用。 在研究了AsyncTaskMethodBuilder
的实现,最重要的是AsyncMethodBuilderCore.GetCompletionAction
,我现在相信我所看到的GC行为非常有意义 。 我将在下面解释一下。
代码:
using System; using System.Threading; using System.Threading.Tasks; using System.Runtime.InteropServices; using System.Runtime.CompilerServices; namespace ConsoleApplication { public class Program { // Original version with async/await /* static async Task TestAsync() { Console.WriteLine("Enter TestAsync"); var awaiter = new Awaiter(); //var hold = GCHandle.Alloc(awaiter); var i = 0; while (true) { await awaiter; Console.WriteLine("tick: " + i++); } Console.WriteLine("Exit TestAsync"); } */ // Manually coded state machine version struct StateMachine: IAsyncStateMachine { public int _state; public Awaiter _awaiter; public AsyncTaskMethodBuilder _builder; public void MoveNext() { Console.WriteLine("StateMachine.MoveNext, state: " + this._state); switch (this._state) { case -1: { this._awaiter = new Awaiter(); goto case 0; }; case 0: { this._state = 0; var awaiter = this._awaiter; this._builder.AwaitOnCompleted(ref awaiter, ref this); return; }; default: throw new InvalidOperationException(); } } public void SetStateMachine(IAsyncStateMachine stateMachine) { Console.WriteLine("StateMachine.SetStateMachine, state: " + this._state); this._builder.SetStateMachine(stateMachine); // s_strongRef = stateMachine; } static object s_strongRef = null; } static Task TestAsync() { StateMachine stateMachine = new StateMachine(); stateMachine._state = -1; stateMachine._builder = AsyncTaskMethodBuilder.Create(); stateMachine._builder.Start(ref stateMachine); return stateMachine._builder.Task; } public static void Main(string[] args) { var task = TestAsync(); Thread.Sleep(1000); GC.Collect(GC.MaxGeneration, GCCollectionMode.Forced); Console.WriteLine("Press Enter to exit..."); Console.ReadLine(); } // custom awaiter public class Awaiter : System.Runtime.CompilerServices.INotifyCompletion { Action _continuation; public Awaiter() { Console.WriteLine("Awaiter()"); } ~Awaiter() { Console.WriteLine("~Awaiter()"); } // resume after await, called upon external event public void Continue() { var continuation = Interlocked.Exchange(ref _continuation, null); if (continuation != null) continuation(); } // custom Awaiter methods public Awaiter GetAwaiter() { return this; } public bool IsCompleted { get { return false; } } public void GetResult() { } // INotifyCompletion public void OnCompleted(Action continuation) { Console.WriteLine("Awaiter.OnCompleted"); Volatile.Write(ref _continuation, continuation); } } } }
编译器生成的状态机是一个可变结构,由ref
传递。 显然,这是一个优化,以避免额外的分配。
其中核心部分发生在AsyncMethodBuilderCore.GetCompletionAction
,其中当前状态机结构被加框,并且对盒装副本的引用由传递给INotifyCompletion.OnCompleted
的延续回调INotifyCompletion.OnCompleted
。
这是对状态机的唯一引用,它有机会站在GC并在await
之后继续存在。 TestAsync
返回的Task
对象不保存对它的引用,只有await
continuation回调。 我相信这是故意的,以保持高效的GC行为。
注意注释行:
// s_strongRef = stateMachine;
如果我不对它进行评论,状态机的盒装副本就不会得到awaiter
,并且awaiter
它作为其中的一部分保持活着。 当然,这不是解决方案,但它说明了问题。
所以,我得出以下结论。 虽然异步操作处于“正在进行中”并且状态机的状态( MoveNext
)当前都没有被执行,但是继续回调的“守护者”的责任是强制保留回调本身,确保状态机的盒装副本不会被垃圾收集。
例如,在YieldAwaitable
(由Task.Yield
返回)的情况下,作为ThreadPool.QueueUserWorkItem
调用的结果, ThreadPool.QueueUserWorkItem
任务调度程序保留对continuation回调的外部引用。 对于Task.GetAwaiter
,它由任务对象间接引用 。
在我的例子中,延续回调的“守护者”是Awaiter
本身。
因此,只要CLR不知道(在状态机对象之外)继续回调的外部引用,自定义等待者应该采取措施使回调对象保持活动状态。 反过来,这将使整个状态机保持活力。 在这种情况下,以下步骤是必要的:
- 在
GCHandle.Alloc
上调用回调上的INotifyCompletion.OnCompleted
。 - 在异步事件实际发生之前调用
GCHandle.Free
,然后再调用continuation回调。 - 如果事件从未发生
GCHandle.Free
则实现IDispose
以调用GCHandle.Free
。
鉴于此,下面是原始计时器回调代码的一个版本,它可以正常工作。 注意,没有必要强制保持定时器回调委托( WaitOrTimerCallbackProc callback
)。它作为状态机的一部分保持活着。 更新 :正如@svick所指出的,此语句可能特定于状态机的当前实现(C#5.0)。 我添加了GC.KeepAlive(callback)
以消除对此行为的任何依赖,以防它在将来的编译器版本中发生更改。
using System; using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; namespace ConsoleApplication { class Program { // Test task static async Task TestAsync(CancellationToken token) { using (var awaiter = new Awaiter()) { WaitOrTimerCallbackProc callback = (a, b) => awaiter.Continue(); try { IntPtr timerHandle; if (!CreateTimerQueueTimer(out timerHandle, IntPtr.Zero, callback, IntPtr.Zero, 500, 500, 0)) throw new System.ComponentModel.Win32Exception( Marshal.GetLastWin32Error()); try { var i = 0; while (true) { token.ThrowIfCancellationRequested(); await awaiter; Console.WriteLine("tick: " + i++); } } finally { DeleteTimerQueueTimer(IntPtr.Zero, timerHandle, IntPtr.Zero); } } finally { // reference the callback at the end // to avoid a chance for it to be GC'ed GC.KeepAlive(callback); } } } // Entry point static void Main(string[] args) { // cancel in 3s var testTask = TestAsync(new CancellationTokenSource(10 * 1000).Token); Thread.Sleep(1000); GC.Collect(GC.MaxGeneration, GCCollectionMode.Forced, true); Thread.Sleep(2000); Console.WriteLine("Press Enter to GC..."); Console.ReadLine(); GC.Collect(GC.MaxGeneration, GCCollectionMode.Forced); Console.WriteLine("Press Enter to exit..."); Console.ReadLine(); } // Custom awaiter public class Awaiter : System.Runtime.CompilerServices.INotifyCompletion, IDisposable { Action _continuation; GCHandle _hold = new GCHandle(); public Awaiter() { Console.WriteLine("Awaiter()"); } ~Awaiter() { Console.WriteLine("~Awaiter()"); } void ReleaseHold() { if (_hold.IsAllocated) _hold.Free(); } // resume after await, called upon external event public void Continue() { Action continuation; // it's OK to use lock (this) // the C# compiler would never do this, // because it's slated to work with struct awaiters lock (this) { continuation = _continuation; _continuation = null; ReleaseHold(); } if (continuation != null) continuation(); } // custom Awaiter methods public Awaiter GetAwaiter() { return this; } public bool IsCompleted { get { return false; } } public void GetResult() { } // INotifyCompletion public void OnCompleted(Action continuation) { lock (this) { ReleaseHold(); _continuation = continuation; _hold = GCHandle.Alloc(_continuation); } } // IDispose public void Dispose() { lock (this) { _continuation = null; ReleaseHold(); } } } // p/invoke delegate void WaitOrTimerCallbackProc(IntPtr lpParameter, bool TimerOrWaitFired); [DllImport("kernel32.dll")] static extern bool CreateTimerQueueTimer(out IntPtr phNewTimer, IntPtr TimerQueue, WaitOrTimerCallbackProc Callback, IntPtr Parameter, uint DueTime, uint Period, uint Flags); [DllImport("kernel32.dll")] static extern bool DeleteTimerQueueTimer(IntPtr TimerQueue, IntPtr Timer, IntPtr CompletionEvent); } }