以非阻塞方式调用TaskCompletionSource.SetResult

我发现了TaskCompletionSource.SetResult(); 在返回之前调用等待任务的代码。 在我的情况下,导致死锁。

这是一个在普通Thread启动的简化版本

 void ReceiverRun() while (true) { var msg = ReadNextMessage(); TaskCompletionSource task = requests[msg.RequestID]; if(msg.Error == null) task.SetResult(msg); else task.SetException(new Exception(msg.Error)); } } 

代码的“异步”部分看起来像这样。

 await SendAwaitResponse("first message"); SendAwaitResponse("second message").Wait(); 

Wait实际上嵌套在非异步调用中。

SendAwaitResponse(简化)

 public static Task SendAwaitResponse(string msg) { var t = new TaskCompletionSource(); requests.Add(GetID(msg), t); stream.Write(msg); return t.Task; } 

我的假设是第二个SendAwaitResponse将在ThreadPool线程中执行,但它会在为ReceiverRun创建的线程中继续。

无论如何设置任务的结果而不继续等待代码?

该应用程序是一个控制台应用程

我发现了TaskCompletionSource.SetResult(); 在返回之前调用等待任务的代码。 在我的情况下,导致死锁。

是的,我有一篇博客文章记录了这一点(AFAIK没有记录在MSDN上)。 死锁发生的原因有两个:

  1. async和阻塞代码混合在一起(即async方法调用Wait )。
  2. 使用TaskContinuationOptions.ExecuteSynchronously计划任务延续。

我建议从最简单的解决方案开始:删除第一件事(1)。 即,不要混合asyncWait呼叫:

 await SendAwaitResponse("first message"); SendAwaitResponse("second message").Wait(); 

相反,使用await

 await SendAwaitResponse("first message"); await SendAwaitResponse("second message"); 

如果需要,可以在调用堆栈的其他位置Wait不是async方法中)。

这是我最推荐的解决方案。 但是,如果你想尝试删除第二件事(2),你可以做一些技巧:将SetResult包装在Task.Run以强制它进入一个单独的线程(我的AsyncEx库有*WithBackgroundContinuations扩展方法,确切地说,或者给你的线程一个实际的上下文(比如我的AsyncContext类型 )并指定ConfigureAwait(false) ,这将导致继续忽略ExecuteSynchronously标志 。

但是这些解决方案比分离async和阻塞代码要复杂得多。

作为旁注,请看一下TPL Dataflow ; 听起来你可能觉得它很有用。

由于您的应用程序是一个控制台应用程序,它在默认的同步上下文中运行,其中await continuation回调将在等待任务完成的同一线程上调用。 如果要在await SendAwaitResponse之后切换线程,可以使用await Task.Yield()来执行此操作:

 await SendAwaitResponse("first message"); await Task.Yield(); // will be continued on a pool thread // ... SendAwaitResponse("second message").Wait(); // so no deadlock 

你可以通过在Task.Result存储Thread.CurrentThread.ManagedThreadId并在await之后将它与当前线程的id进行比较来进一步改进这一点。 如果您仍在同一个线程上,请await Task.Yield()

虽然我知道SendAwaitResponse是你实际代码的简化版本,但它内部仍然是完全同步的(你在问题中展示它的方式)。 为什么你会期待任何线程切换?

无论如何,您可能应该重新设计逻辑,而不是假设您当前使用的是什么线程。 避免混合使用awaitTask.Wait()并使所有代码异步。 通常,可以在顶层(例如Main )内的某处坚持使用一个Wait() )。

[EDITED]ReceiverRun调用task.SetResult(msg)实际上将控制流转移到您await task的点 – 没有线程切换,因为默认的同步上下文的行为。 因此,执行实际消息处理的代码将接管ReceiverRun线程。 最终, SendAwaitResponse("second message").Wait()在同一个线程上调用SendAwaitResponse("second message").Wait() ,导致死锁。

下面是一个控制台应用程序代码,模仿您的示例。 它使用await Task.Yield() ProcessAsync await Task.Yield()来在单独的线程上安排继续,因此控制流返回到ReceiverRun并且没有死锁。

 using System; using System.Collections.Concurrent; using System.Threading; using System.Threading.Tasks; namespace ConsoleApplication { class Program { class Worker { public struct Response { public string message; public int threadId; } CancellationToken _token; readonly ConcurrentQueue _messages = new ConcurrentQueue(); readonly ConcurrentDictionary> _requests = new ConcurrentDictionary>(); public Worker(CancellationToken token) { _token = token; } string ReadNextMessage() { // using Thread.Sleep(100) for test purposes here, // should be using ManualResetEvent (or similar synchronization primitive), // depending on how messages arrive string message; while (!_messages.TryDequeue(out message)) { Thread.Sleep(100); _token.ThrowIfCancellationRequested(); } return message; } public void ReceiverRun() { LogThread("Enter ReceiverRun"); while (true) { var msg = ReadNextMessage(); LogThread("ReadNextMessage: " + msg); var tcs = _requests[msg]; tcs.SetResult(new Response { message = msg, threadId = Thread.CurrentThread.ManagedThreadId }); _token.ThrowIfCancellationRequested(); // this is how we terminate the loop } } Task SendAwaitResponse(string msg) { LogThread("SendAwaitResponse: " + msg); var tcs = new TaskCompletionSource(); _requests.TryAdd(msg, tcs); _messages.Enqueue(msg); return tcs.Task; } public async Task ProcessAsync() { LogThread("Enter Worker.ProcessAsync"); var task1 = SendAwaitResponse("first message"); await task1; LogThread("result1: " + task1.Result.message); // avoid deadlock for task2.Wait() with Task.Yield() // comment this out and task2.Wait() will dead-lock if (task1.Result.threadId == Thread.CurrentThread.ManagedThreadId) await Task.Yield(); var task2 = SendAwaitResponse("second message"); task2.Wait(); LogThread("result2: " + task2.Result.message); var task3 = SendAwaitResponse("third message"); // still on the same thread as with result 2, no deadlock for task3.Wait() task3.Wait(); LogThread("result3: " + task3.Result.message); var task4 = SendAwaitResponse("fourth message"); await task4; LogThread("result4: " + task4.Result.message); // avoid deadlock for task5.Wait() with Task.Yield() // comment this out and task5.Wait() will dead-lock if (task4.Result.threadId == Thread.CurrentThread.ManagedThreadId) await Task.Yield(); var task5 = SendAwaitResponse("fifth message"); task5.Wait(); LogThread("result5: " + task5.Result.message); LogThread("Leave Worker.ProcessAsync"); } public static void LogThread(string message) { Console.WriteLine("{0}, thread: {1}", message, Thread.CurrentThread.ManagedThreadId); } } static void Main(string[] args) { Worker.LogThread("Enter Main"); var cts = new CancellationTokenSource(5000); // cancel after 5s var worker = new Worker(cts.Token); Task receiver = Task.Run(() => worker.ReceiverRun()); Task main = worker.ProcessAsync(); try { Task.WaitAll(main, receiver); } catch (Exception e) { Console.WriteLine("Exception: " + e.Message); } Worker.LogThread("Leave Main"); Console.ReadLine(); } } } 

这与在Task.Run(() => task.SetResult(msg))执行Task.Run(() => task.SetResult(msg))没有太大区别。 我能想到的唯一优势是你可以明确控制何时切换线程。 这样,您可以尽可能长时间保持在同一个线程上(例如,对于task2task3task4 ,但是在task4之后仍然需要另一个线程切换以避免task5.Wait()上的死锁)。

这两种解决方案最终都会使线程池增长,这在性能和可伸缩性方面都很糟糕。

现在,如果我们在上面的代码中将task.Wait()替换为ProcessAsync内的任何地方await task ,我们将不必使用await Task.Yield并且仍然没有死锁。 但是,在ProcessAsync的第一个await task1之后的整个await调用实际上将在ReceiverRun线程上执行。 只要我们不用其他Wait()式调用来阻止这个线程,并且在我们处理消息时不做大量的CPU绑定工作,这种方法可能正常工作(异步IO绑定await调用仍然应该没问题,它们实际上可能触发隐式线程切换)。

也就是说,我认为您需要一个单独的线程,其上安装了序列化同步上下文来处理消息(类似于WindowsFormsSynchronizationContext )。 这就是包含awaits的异步代码应该运行的地方。 您仍然需要避免在该线程上使用Task.Wait 。 如果单个消息处理需要大量CPU限制工作,则应使用Task.Run进行此类工作。 对于异步IO绑定调用,您可以保持在同一个线程上。

您可能希望从@StephenCleary的Nito异步库中查看ActionDispatcher / ActionDispatcherSynchronizationContext ,以获取异步消息处理逻辑。 希望斯蒂芬跳进来并提供更好的答案。

“我的假设是第二个SendAwaitResponse将在ThreadPool线程中执行,但它会在为ReceiverRun创建的线程中继续。”

它完全取决于您在SendAwaitResponse中执行的操作。 异步和并发不是一回事 。

退房: C#5 Async / Await – 是*并发*?