转向异步套接字并行,不仅仅是在使用TPL的非常密集的应用程序中并发

我正在编写一个使用Socket的应用程序,它将非常密集,然后我真的需要使用我们在大服务器中的每个核心。 我看到问题( 如何使用ThreadPool运行套接字线程并行? )这里在stackoverflow中只有一个答案指向此MSDN示例 。

但我认为它只指向如何使它并发而不是并行 ,这里有人问如何cpu密集是打开一个套接字 ,它看起来非常密集,有人告诉它不帮助TPL TaskFactory.FromAsync vs任务与阻止方法和某人在这里教会如何使用TaskFactory.FromAsync( 是否存在将现有的BeginXXX / EndXXX异步方法包装成异步任务的模式? )。

如何保持套接字操作并行和高性能,如果处理插槽问题,如断开连接,半连接套接字和消息边界是正常异步方式的头疼。 如果将TPL和Task放在一起,如何处理它。

看到:

using System; using System.Collections.Generic; using System.Linq; using System.Net; using System.Net.Sockets; using System.Text; using System.Threading; using System.Threading.Tasks; namespace skttool { public class StateObject { public Socket workSocket = null; public const int BufferSize = 1024; public byte[] buffer = new byte[BufferSize]; public int bytesRead = 0; public StringBuilder sb = new StringBuilder(); } public class tool { //------------------------------------------------- private ManualResetEvent evtConnectionDone = new ManualResetEvent(false); private Socket skttool = null; private bool running = false; private StateObject state = null; //------------------------------------------------- toolConfig _cfg; public tool(toolConfig cfg) { _cfg = cfg; } //------------------------------------------------- public void socketListeningSet() { IPEndPoint localEndPoint; Socket skttool; byte[] bytes = new Byte[1024]; localEndPoint = new IPEndPoint(IPAddress.Any, _cfg.addressPort); skttool = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); skttool.Bind(localEndPoint); skttool.Listen(_cfg.maxQtdSockets); } //------------------------------------------------- public void start() { running = true; Task T1 = Task.Factory.StartNew(socketListeningSet); T1.ContinueWith(prev => { while (running) { evtConnectionDone.Reset(); Task accepetChunk = Task.Factory.FromAsync( skttool.BeginAccept, skttool.EndAccept, accept, skttool, TaskCreationOptions.AttachedToParent); accepetChunk.ContinueWith(accept, TaskContinuationOptions.NotOnFaulted | TaskCreationOptions.AttachedToParent); evtConnectionDone.WaitOne(); } }); } //------------------------------------------------- void accept(Task accepetChunk) { state = new StateObject(); evtConnectionDone.Set(); state.workSocket = accepetChunk.Result; Task readChunk = Task.Factory.FromAsync( state.workSocket.BeginReceive, state.workSocket.EndReceive, state.buffer, state.bytesRead, state.buffer.Length - state.bytesRead, null, TaskCreationOptions.AttachedToParent); readChunk.ContinueWith(read, TaskContinuationOptions.NotOnFaulted | TaskCreationOptions.AttachedToParent); } //------------------------------------------------- void read(Task readChunk) { state.bytesRead += readChunk.Result; if (readChunk.Result > 0 && state.bytesRead < state.buffer.Length) { read(); return; } _data = doTask(_data); Task sendChunk = Task.Factory.FromAsync( state.workSocket.BeginSend, state.workSocket.EndSend, state.buffer, state.bytesRead, state.buffer.Length - state.bytesRead, null, TaskCreationOptions.AttachedToParent); sendChunk.ContinueWith(send, TaskContinuationOptions.NotOnFaulted | TaskCreationOptions.AttachedToParent); } //------------------------------------------------- void send(Task readChunk) { state.workSocket.Shutdown(SocketShutdown.Both); state.workSocket.Close(); } //------------------------------------------------- byte[] doTask(byte[] data) { return Array.Reverse(data); } //------------------------------------------------- } } 

请参阅此链接有关TPL和传统.NET异步编程 ,它没有回答,但也许可以帮助您。 有关于异步编程模型(APM)基于事件的异步模式(EAP)的信息