TaskContinuationOptions.RunContinuations异步和Stack Dives

在这篇博客文章中 ,Stephan Toub描述了一个将包含在.NET 4.6中的新function,它为TaskCreationOptions和TaskContinuationOptions枚举增加了另一个名为RunContinuationsAsynchronously

他解释说:

“我谈到了在TaskCompletionSource上调用{Try} Set *方法的分支,从TaskCompletionSource的Task的任何同步延续都可以作为调用的一部分同步运行。如果我们在这里调用SetResult同时持有锁,那么同步延续关闭该任务将在持有锁的同时运行,这可能会导致非常真实的问题。因此,在持有锁时我们抓住TaskCompletionSource来完成,但是我们还没有完成它,延迟这样做直到锁定已被释放“

并给出以下示例来演示:

 private SemaphoreSlim _gate = new SemaphoreSlim(1, 1); private async Task WorkAsync() { await _gate.WaitAsync().ConfigureAwait(false); try { // work here } finally { _gate.Release(); } } 

现在假设您有很多对WorkAsync的调用:

 await Task.WhenAll(from i in Enumerable.Range(0, 10000) select WorkAsync()); 

我们刚刚创建了10,000个对WorkAsync的调用,这些调用将在信号量上进行适当的序列化。 其中一个任务将进入关键区域,其他任务将在WaitAsync调用上排队,SemaphoreSlim内部有效地将任务调用完成,当有人调用Release时。 如果Release同步完成了Task,那么当第一个任务调用Release时,它将同步开始执行第二个任务,当它调用Release时,它将同步开始执行第三个任务,依此类推。 如果上面代码的“// work here”部分没有包含任何等待产生的东西,那么我们可能会在这里堆叠潜水并最终可能导致堆栈爆炸。

我很难掌握他谈论同步执行延续的部分。

怎么可能导致堆栈潜水? 更重要的是,为了解决这个问题, RunContinuationsAsynchronouslyRunContinuationsAsynchronously地做些什么?

这里的关键概念是任务的延续可以在完成先前任务的同一线程上同步运行。

让我们假设这是SemaphoreSlim.Release的实现(它实际上是Toub的AsyncSemphore的):

 public void Release() { TaskCompletionSource toRelease = null; lock (m_waiters) { if (m_waiters.Count > 0) toRelease = m_waiters.Dequeue(); else ++m_currentCount; } if (toRelease != null) toRelease.SetResult(true); } 

我们可以看到它同步完成一项任务(使用TaskCompletionSource )。 在这种情况下,如果WorkAsync没有其他异步点(即根本没有await s,或者所有await s都在已完成的任务上)并且调用_gate.Release()可以同步完成对_gate.WaitAsync()的挂起调用。同一个线程你可能会达到一个状态,在这个状态下,一个线程顺序释放信号量,完成下一个待处理的调用,执行// work here ,然后再次释放信号等。等等。

这意味着同一个线程在堆栈中越来越深,因此堆栈潜水。

RunContinuationsAsynchronously确保延续不同步运行,因此释放信号量的线程继续运行,并为另一个线程调度延续(哪一个依赖于其他延续参数,例如TaskScheduler

这在逻辑上类似于将完成发布到ThreadPool

 public void Release() { TaskCompletionSource toRelease = null; lock (m_waiters) { if (m_waiters.Count > 0) toRelease = m_waiters.Dequeue(); else ++m_currentCount; } if (toRelease != null) Task.Run(() => toRelease.SetResult(true)); } 

怎么可能导致堆栈潜水? 更重要的是,为了解决这个问题,RunContinuations会异步地做些什么?

i3arnon非常好地解释了RunContinuationsAsynchronously引入RunContinuationsAsynchronously背后的原因。 我的回答与他的相反; 事实上,我写这篇文章也是为了我自己的参考(我自己也不记得从现在起半年内的任何细微之处:)

首先,让我们看看TaskCompletionSourceRunContinuationsAsynchronously选项与Task.Run(() => tcs.SetResult(result))或者喜欢的不同。 让我们尝试一个简单的控制台应用程序:

 using System; using System.Threading; using System.Threading.Tasks; namespace ConsoleApplications { class Program { static void Main(string[] args) { ThreadPool.SetMinThreads(100, 100); Console.WriteLine("start, " + new { System.Environment.CurrentManagedThreadId }); var tcs = new TaskCompletionSource(); // test ContinueWith-style continuations (TaskContinuationOptions.ExecuteSynchronously) ContinueWith(1, tcs.Task); ContinueWith(2, tcs.Task); ContinueWith(3, tcs.Task); // test await-style continuations ContinueAsync(4, tcs.Task); ContinueAsync(5, tcs.Task); ContinueAsync(6, tcs.Task); Task.Run(() => { Console.WriteLine("before SetResult, " + new { System.Environment.CurrentManagedThreadId }); tcs.TrySetResult(true); Thread.Sleep(10000); }); Console.ReadLine(); } // log static void Continuation(int id) { Console.WriteLine(new { continuation = id, System.Environment.CurrentManagedThreadId }); Thread.Sleep(1000); } // await-style continuation static async Task ContinueAsync(int id, Task task) { await task.ConfigureAwait(false); Continuation(id); } // ContinueWith-style continuation static Task ContinueWith(int id, Task task) { return task.ContinueWith( t => Continuation(id), CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); } } } 

请注意所有continuation如何在调用TrySetResult的同一线程上同步运行:

 start,{CurrentManagedThreadId = 1}
在SetResult之前,{CurrentManagedThreadId = 3}
 {continuation = 1,CurrentManagedThreadId = 3}
 {continuation = 2,CurrentManagedThreadId = 3}
 {continuation = 3,CurrentManagedThreadId = 3}
 {continuation = 4,CurrentManagedThreadId = 3}
 {continuation = 5,CurrentManagedThreadId = 3}
 {continuation = 6,CurrentManagedThreadId = 3}

现在如果我们不希望这种情况发生,我们希望每个延续都是异步运行(即,与其他连续并行,可能在另一个线程上,没有任何同步上下文)?

通过安装假的临时同步上下文,可以为await style continuation做一个技巧(更多细节在这里 ):

 public static class TaskExt { class SimpleSynchronizationContext : SynchronizationContext { internal static readonly SimpleSynchronizationContext Instance = new SimpleSynchronizationContext(); }; public static void TrySetResult(this TaskCompletionSource @this, TResult result, bool asyncAwaitContinuations) { if (!asyncAwaitContinuations) { @this.TrySetResult(result); return; } var sc = SynchronizationContext.Current; SynchronizationContext.SetSynchronizationContext(SimpleSynchronizationContext.Instance); try { @this.TrySetResult(result); } finally { SynchronizationContext.SetSynchronizationContext(sc); } } } 

现在,在我们的测试代码中使用tcs.TrySetResult(true, asyncAwaitContinuations: true)

 start,{CurrentManagedThreadId = 1}
在SetResult之前,{CurrentManagedThreadId = 3}
 {continuation = 1,CurrentManagedThreadId = 3}
 {continuation = 2,CurrentManagedThreadId = 3}
 {continuation = 3,CurrentManagedThreadId = 3}
 {continuation = 4,CurrentManagedThreadId = 4}
 {continuation = 5,CurrentManagedThreadId = 5}
 {continuation = 6,CurrentManagedThreadId = 6}

注意await continuation现在如何并行运行(尽管仍然在所有同步ContinueWith连续之后)。

这个asyncAwaitContinuations: true逻辑是一个hack,它只适用于await延续。 新的RunContinuationsAsynchronously使它适用于任何类型的延续,附加到TaskCompletionSource.Task

RunContinuationsAsynchronously另一个不错的方面是,任何计划在特定同步上下文上恢复的await样式连续将异步运行在该上下文上(使用SynchronizationContext.Post ,即使TCS.Task相同的上下文上完成(与TCS.SetResult的当前行为不同) TCS.SetResultTCS.SetResult style continuation也将由相应的任务调度程序异步运行(最常见的是TaskScheduler.DefaultTaskScheduler.FromCurrentSynchronizationContext )。它们不会通过TaskScheduler.TryExecuteTaskInline内联。我相信Stephen Toub已经澄清了对他博客文章的评论,也可以在CoreCLR的Task.cs中看到 。

为什么我们要担心在所有延续上强加不同步?

当我处理协同执行的async方法(协同例程)时,我通常需要它。

一个简单的例子是可暂停的异步处理:一个异步进程暂停/恢复另一个异步处理的执行。 它们的执行工作流在某些await点处同步,并且TaskCompletionSource直接或间接地用于这种类型的同步。

下面是一些现成的示例代码,它使用了Stephen Toub的PauseTokenSource 。 这里,一个async方法StartAndControlWorkAsync启动并定期暂停/恢复另一个async方法DoWorkAsync 。 尝试更改asyncAwaitContinuations: true to asyncAwaitContinuations: false并看到逻辑被完全破坏:

 using System; using System.Threading; using System.Threading.Tasks; namespace ConsoleApp { class Program { static void Main() { StartAndControlWorkAsync(CancellationToken.None).Wait(); } // Do some work which can be paused/resumed public static async Task DoWorkAsync(PauseToken pause, CancellationToken token) { try { var step = 0; while (true) { token.ThrowIfCancellationRequested(); Console.WriteLine("Working, step: " + step++); await Task.Delay(1000).ConfigureAwait(false); Console.WriteLine("Before await pause.WaitForResumeAsync()"); await pause.WaitForResumeAsync(); Console.WriteLine("After await pause.WaitForResumeAsync()"); } } catch (Exception e) { Console.WriteLine("Exception: {0}", e); throw; } } // Start DoWorkAsync and pause/resume it static async Task StartAndControlWorkAsync(CancellationToken token) { var pts = new PauseTokenSource(); var task = DoWorkAsync(pts.Token, token); while (true) { token.ThrowIfCancellationRequested(); Console.WriteLine("Press enter to pause..."); Console.ReadLine(); Console.WriteLine("Before pause requested"); await pts.PauseAsync(); Console.WriteLine("After pause requested, paused: " + pts.IsPaused); Console.WriteLine("Press enter to resume..."); Console.ReadLine(); Console.WriteLine("Before resume"); pts.Resume(); Console.WriteLine("After resume"); } } // Based on Stephen Toub's PauseTokenSource // http://blogs.msdn.com/b/pfxteam/archive/2013/01/13/cooperatively-pausing-async-methods.aspx // the main difference is to make sure that when the consumer-side code - which requested the pause - continues, // the producer-side code has already reached the paused (awaiting) state. // Eg a media player "Pause" button is clicked, gets disabled, playback stops, // and only then "Resume" button gets enabled public class PauseTokenSource { internal static readonly Task s_completedTask = Task.Delay(0); readonly object _lock = new Object(); bool _paused = false; TaskCompletionSource _pauseResponseTcs; TaskCompletionSource _resumeRequestTcs; public PauseToken Token { get { return new PauseToken(this); } } public bool IsPaused { get { lock (_lock) return _paused; } } // request a resume public void Resume() { TaskCompletionSource resumeRequestTcs = null; lock (_lock) { resumeRequestTcs = _resumeRequestTcs; _resumeRequestTcs = null; if (!_paused) return; _paused = false; } if (resumeRequestTcs != null) resumeRequestTcs.TrySetResult(true, asyncAwaitContinuations: true); } // request a pause (completes when paused state confirmed) public Task PauseAsync() { Task responseTask = null; lock (_lock) { if (_paused) return _pauseResponseTcs.Task; _paused = true; _pauseResponseTcs = new TaskCompletionSource(); responseTask = _pauseResponseTcs.Task; _resumeRequestTcs = null; } return responseTask; } // wait for resume request internal Task WaitForResumeAsync() { Task resumeTask = s_completedTask; TaskCompletionSource pauseResponseTcs = null; lock (_lock) { if (!_paused) return s_completedTask; _resumeRequestTcs = new TaskCompletionSource(); resumeTask = _resumeRequestTcs.Task; pauseResponseTcs = _pauseResponseTcs; _pauseResponseTcs = null; } if (pauseResponseTcs != null) pauseResponseTcs.TrySetResult(true, asyncAwaitContinuations: true); return resumeTask; } } // consumer side public struct PauseToken { readonly PauseTokenSource _source; public PauseToken(PauseTokenSource source) { _source = source; } public bool IsPaused { get { return _source != null && _source.IsPaused; } } public Task WaitForResumeAsync() { return IsPaused ? _source.WaitForResumeAsync() : PauseTokenSource.s_completedTask; } } } public static class TaskExt { class SimpleSynchronizationContext : SynchronizationContext { internal static readonly SimpleSynchronizationContext Instance = new SimpleSynchronizationContext(); }; public static void TrySetResult(this TaskCompletionSource @this, TResult result, bool asyncAwaitContinuations) { if (!asyncAwaitContinuations) { @this.TrySetResult(result); return; } var sc = SynchronizationContext.Current; SynchronizationContext.SetSynchronizationContext(SimpleSynchronizationContext.Instance); try { @this.TrySetResult(result); } finally { SynchronizationContext.SetSynchronizationContext(sc); } } } } 

我不想在这里使用Task.Run(() => tcs.SetResult(result)) ,因为当它们已被安排在具有适当的UI线程的异步运行时将连续性推送到ThreadPool是多余的。同步上下文。 同时,如果StartAndControlWorkAsyncDoWorkAsync都在相同的UI同步上下文上运行,我们也会进行堆栈tcs.SetResult(result)如果使用tcs.SetResult(result)而没有Task.RunSynchronizationContext.Post包装)。

现在, RunContinuationsAsynchronously可能是解决这个问题的最佳方法。