什么是ThreadPool服务器的async / await等价物?
我正在使用同步apis和线程池的tcp服务器看起来像这样:
TcpListener listener; void Serve(){ while(true){ var client = listener.AcceptTcpClient(); ThreadPool.QueueUserWorkItem(this.HandleConnection, client); //Or alternatively new Thread(HandleConnection).Start(client) } }
假设我的目标是在资源使用率最低的情况下处理尽可能多的并发连接,这似乎很快就会受到可用线程数量的限制。 我怀疑通过使用非阻塞任务apis,我将能够用更少的资源处理更多。
我最初的印象是这样的:
async Task Serve(){ while(true){ var client = await listener.AcceptTcpClientAsync(); HandleConnectionAsync(client); //fire and forget? } }
但令我印象深刻的是,这可能会导致瓶颈。 也许HandleConnectionAsync需要花费非常长的时间才能达到第一个await,并且会阻止主接受循环继续进行。 这只会使用一个线程,还是运行时会在多个线程上神奇地运行它看起来合适的东西?
有没有办法将这两种方法结合起来,这样我的服务器就可以使用它所需的线程数来满足正在运行的任务的数量,但是它不会在IO操作上不必要地阻塞线程?
在这种情况下,是否存在最大化吞吐量的惯用方法?
我让Framework管理线程并且不会创建任何额外的线程,除非我可能需要进行性能分析测试。 特别是,如果HandleConnectionAsync
中的调用主要是IO绑定的。
无论如何,如果你想在HandleConnectionAsync
的开头发布调用线程(调度程序),那么这是一个非常简单的解决方案。 您可以使用await Yield()
从ThreadPool
跳转到新线程。 如果您的服务器在初始线程(控制台应用程序,WCF服务)上没有安装任何同步上下文的执行环境中运行,那么这是有效的,这通常是TCP服务器的情况。
[已编辑]以下说明了这一点(代码最初来自此处 )。 注意,主while
循环不会显式创建任何线程:
using System; using System.Collections.Generic; using System.Net.Sockets; using System.Text; using System.Threading.Tasks; class Program { object _lock = new Object(); // sync lock List _connections = new List (); // pending connections // The core server task private async Task StartListener() { var tcpListener = TcpListener.Create(8000); tcpListener.Start(); while (true) { var tcpClient = await tcpListener.AcceptTcpClientAsync(); Console.WriteLine("[Server] Client has connected"); var task = StartHandleConnectionAsync(tcpClient); // if already faulted, re-throw any error on the calling context if (task.IsFaulted) task.Wait(); } } // Register and handle the connection private async Task StartHandleConnectionAsync(TcpClient tcpClient) { // start the new connection task var connectionTask = HandleConnectionAsync(tcpClient); // add it to the list of pending task lock (_lock) _connections.Add(connectionTask); // catch all errors of HandleConnectionAsync try { await connectionTask; // we may be on another thread after "await" } catch (Exception ex) { // log the error Console.WriteLine(ex.ToString()); } finally { // remove pending task lock (_lock) _connections.Remove(connectionTask); } } // Handle new connection private async Task HandleConnectionAsync(TcpClient tcpClient) { await Task.Yield(); // continue asynchronously on another threads using (var networkStream = tcpClient.GetStream()) { var buffer = new byte[4096]; Console.WriteLine("[Server] Reading from client"); var byteCount = await networkStream.ReadAsync(buffer, 0, buffer.Length); var request = Encoding.UTF8.GetString(buffer, 0, byteCount); Console.WriteLine("[Server] Client wrote {0}", request); var serverResponseBytes = Encoding.UTF8.GetBytes("Hello from server"); await networkStream.WriteAsync(serverResponseBytes, 0, serverResponseBytes.Length); Console.WriteLine("[Server] Response has been written"); } } // The entry point of the console app static void Main(string[] args) { Console.WriteLine("Hit Ctrl-C to exit."); new Program().StartListener().Wait(); } }
或者,代码可能如下所示,而不await Task.Yield()
。 注意,我将async
lambda传递给Task.Run
,因为我仍然希望从HandleConnectionAsync
异步API中受益,并在那里使用await
:
// Handle new connection private static Task HandleConnectionAsync(TcpClient tcpClient) { return Task.Run(async () => { using (var networkStream = tcpClient.GetStream()) { var buffer = new byte[4096]; Console.WriteLine("[Server] Reading from client"); var byteCount = await networkStream.ReadAsync(buffer, 0, buffer.Length); var request = Encoding.UTF8.GetString(buffer, 0, byteCount); Console.WriteLine("[Server] Client wrote {0}", request); var serverResponseBytes = Encoding.UTF8.GetBytes("Hello from server"); await networkStream.WriteAsync(serverResponseBytes, 0, serverResponseBytes.Length); Console.WriteLine("[Server] Response has been written"); } }); }
[更新]基于注释:如果这将是库代码,则执行环境确实是未知的,并且可能具有非默认同步上下文。 在这种情况下,我宁愿在池线程上运行主服务器循环(没有任何同步上下文):
private static Task StartListener() { return Task.Run(async () => { var tcpListener = TcpListener.Create(8000); tcpListener.Start(); while (true) { var tcpClient = await tcpListener.AcceptTcpClientAsync(); Console.WriteLine("[Server] Client has connected"); var task = StartHandleConnectionAsync(tcpClient); // if already faulted, re-throw any error on the calling context if (task.IsFaulted) task.Wait(); } }); }
这样,在StartListener
创建的所有子任务都不会受到客户端代码的同步上下文的影响。 所以,我不必在任何地方明确地调用Task.ConfigureAwait(false)
。
现有的答案已经正确地提出使用Task.Run(() => HandleConnection(client));
,但没有解释原因。
原因如下:您担心, HandleConnectionAsync
可能需要一些时间才能达到第一个等待。 如果您坚持使用异步IO(在本例中应该如此),这意味着HandleConnectionAsync
正在执行CPU绑定工作而不会阻塞任何内容。 这是线程池的完美案例。 它可以运行简短,无阻塞的CPU工作。
你是对的,在返回之前需要花费很长时间的HandleConnectionAsync
会限制接受循环(可能是因为其中存在大量的CPU绑定工作)。 如果您需要高频率的新连接,则应避免这种情况。
如果您确定没有重要的工作限制循环,则可以保存其他线程池Task
而不执行此操作。
或者,您可以同时运行多个接受。 替换await Serve();
通过(例如):
var serverTasks = Enumerable.Range(0, Environment.ProcessorCount) .Select(_ => Serve()); await Task.WhenAll(serverTasks);
这消除了可伸缩性问题。 注意, await
会在这里吞下除一个错误之外的所有错误。
尝试
TcpListener listener; void Serve(){ while(true){ var client = listener.AcceptTcpClient(); Task.Run(() => this.HandleConnection(client)); //Or alternatively new Thread(HandleConnection).Start(client) } }
根据Microsoft http://msdn.microsoft.com/en-AU/library/hh524395.aspx#BKMK_VoidReturnType ,不应使用void返回类型,因为它无法捕获exception。 正如您所指出的那样,您确实需要“即发即忘”的任务,因此我的结论是您必须始终返回Task(正如Microsoft所说),但您应该使用以下命令来捕获错误:
TaskInstance.ContinueWith(i => { /* exception handler */ }, TaskContinuationOptions.OnlyOnFaulted);
我用作证据的一个例子如下:
public static void Main() { Awaitable() .ContinueWith( i => { foreach (var exception in i.Exception.InnerExceptions) { Console.WriteLine(exception.Message); } }, TaskContinuationOptions.OnlyOnFaulted); Console.WriteLine("This needs to come out before my exception"); Console.ReadLine(); } public static async Task Awaitable() { await Task.Delay(3000); throw new Exception("Hey I can catch these pesky things"); }
您是否有任何理由需要接受异步连接? 我的意思是,等待任何客户端连接是否给你任何价值? 这样做的唯一原因是因为在等待连接时服务器中还有其他工作正在进行。 如果你有可能做这样的事情:
public async void Serve() { while (true) { var client = await _listener.AcceptTcpClientAsync(); Task.Factory.StartNew(() => HandleClient(client), TaskCreationOptions.LongRunning); } }
这样接受将释放当前线程离开选项以完成其他事情,并且处理在新线程上运行。 唯一的开销是产生一个新线程来处理客户端,然后再直接回到接受新连接。
编辑:刚刚意识到它与您编写的代码几乎相同。 我想我需要再次阅读你的问题,以便更好地理解你的实际问题:S
EDIT2:
有没有办法将这两种方法结合起来,这样我的服务器就可以使用它所需的线程数来满足正在运行的任务的数量,但是它不会在IO操作上不必要地阻塞线程?
认为我的解决方案实际回答了这个问 虽然真的有必要吗?
Edit3:Made Task.Factory.StartNew()实际上创建了一个新线程。