C#multithreading聊天服务器,手柄断开连接
我正在寻找一种处理断开连接的方法,因为每次关闭客户端时,服务器都会停止工作。 我收到一条错误消息,它在此行中“无法读取超出流的末尾”:
string message = reader.ReadString();
此外,我需要一种方法来从客户端列表中删除断开连接的客户端。 这是我的代码:服务器
using System; using System.Threading; using System.Net.Sockets; using System.IO; using System.Net; using System.Collections.Generic; namespace Server { class Server { public static List clients = new List(); static void Main(string[] args) { IPAddress ip = IPAddress.Parse("127.0.0.1"); TcpListener ServerSocket = new TcpListener(ip, 14000); ServerSocket.Start(); Console.WriteLine("Server started."); while (true) { TcpClient clientSocket = ServerSocket.AcceptTcpClient(); clients.Add(clientSocket); handleClient client = new handleClient(); client.startClient(clientSocket); } } } public class handleClient { TcpClient clientSocket; public void startClient(TcpClient inClientSocket) { this.clientSocket = inClientSocket; Thread ctThread = new Thread(Chat); ctThread.Start(); } private void Chat() { while (true) { BinaryReader reader = new BinaryReader(clientSocket.GetStream()); while (true) { string message = reader.ReadString(); foreach (var client in Server.clients) { BinaryWriter writer = new BinaryWriter(client.GetStream()); writer.Write(message); } } } } } }
客户
using System; using System.Net.Sockets; using System.IO; using System.Threading; namespace Client { class Client { public static void Write() { TcpClient client = new TcpClient("127.0.0.1", 14000); while (true) { string str = Console.ReadLine(); BinaryWriter writer = new BinaryWriter(client.GetStream()); writer.Write(str); } } public static void Read() { TcpClient client = new TcpClient("127.0.0.1", 14000); while (true) { BinaryReader reader = new BinaryReader(client.GetStream()); Console.WriteLine(reader.ReadString()); } } static void Main(string[] args) { Thread Thread = new Thread(Write); Thread Thread2 = new Thread(Read); Thread.Start(); Thread2.Start(); } } }
每次关闭客户端时,服务器都会停止工作。 我收到一条错误消息,它“无法读取超出流的末尾”
从某种意义上说,这是完全正常的。 也就是说,当使用BinaryReader
,它的正常行为是在到达流末尾时抛出EndOfStreamException
。
为什么它到达流的末尾? 好吧,因为客户端断开连接,这就是流发生的事情。 在套接字级别,真正发生的是读取操作以0作为读取的字节数完成。 这表明客户端已正常关闭套接字,并且不会再发送任何数据。
在.NET API中,这被转换为NetworkStream
的末尾, TcpClient
用它来包装实际处理网络I / O的Socket
对象。 而这个NetworkStream
对象又由BinaryReader
对象包装。 并且BinaryReader
在到达流的末尾时抛出该exception。
请注意,您的代码实际上并未为用户提供关闭客户端的优雅方式。 他们必须使用Ctrl + C ,或彻底终止进程。 使用前者具有执行正常关闭套接字的偶然效果,但这只是因为.NET正在处理进程的终止并在对象上运行终结器,例如用于连接到服务器的TcpClient
对象和终结器调用Socket.Shutdown()
告诉服务器它正在关闭。
如果你要杀死进程(例如使用任务管理器),你会发现抛出了IOException
。 应始终准备好良好的网络代码以查看IOException
; 网络不可靠,确实发生了故障。 您希望做一些合理的操作,例如从连接中删除远程端点,而不是让整个程序崩溃。
现在,所有这一切,只是因为EndOfStreamException
是“正常”,这并不意味着你发布的代码是,或者无论如何都是正确的网络编程方式的一个例子。 你有很多问题:
- 没有明确的优雅关闭。
网络I / O提供了一种关闭连接的常规方法,包括在两个端点上进行握手以指示它们何时完成发送以及何时完成接收。 一个端点将指示已完成发送; 另一个会注意到这一点(使用上面提到的0字节读取),然后它自己表明它已完成发送和接收。
TcpClient
和NetworkStream
不会直接公开它,但您可以使用TcpClient.Client
属性来获取更好的正常闭包的Socket
对象,即一个端点可以指示它已完成发送,并且仍然能够等到另一个端点也完成发送。使用
TcpClient.Close()
方法断开连接就像挂断电话而不说“再见”。 使用Socket.Shutdown()
就像完成一个礼貌的电话“好吧,这就是我想说的一切……还有什么吗?” - 您正在使用
BinaryReader
但未正确处理EndOfStreamException
。 - 您的客户端使用两个连接与服务器通信。
网络I / O使用
Socket
对象,该对象支持全双工通信。 不需要创建第二个连接来进行读写操作。 单个连接就足够了,并且更好,因为当您将发送和接收分成两个连接时,您还需要向协议添加一些内容,以便服务器知道这两个连接代表一个客户端(您的代码实际上并不执行)。 - 断开连接后,客户端不会从服务器列表中删除(您在问题中注明了这一点)。
- 客户端列表不是线程安全的。
- 你的
Chat()
方法中有一个额外的“while(true)”。
我已经修改了您的原始示例以解决上述所有问题,我在此处介绍过:
Server Program.cs:
class Program { private static readonly object _lock = new object(); private static readonly List clients = new List (); public static TcpClient[] GetClients() { lock (_lock) return clients.ToArray(); } public static int GetClientCount() { lock (_lock) return clients.Count; } public static void RemoveClient(TcpClient client) { lock (_lock) clients.Remove(client); } static void Main(string[] args) { IPAddress ip = IPAddress.Parse("127.0.0.1"); TcpListener ServerSocket = new TcpListener(ip, 14000); ServerSocket.Start(); Console.WriteLine("Server started."); while (true) { TcpClient clientSocket = ServerSocket.AcceptTcpClient(); Console.WriteLine($"client connected: {clientSocket.Client.RemoteEndPoint}"); lock (_lock) clients.Add(clientSocket); handleClient client = new handleClient(); client.startClient(clientSocket); Console.WriteLine($"{GetClientCount()} clients connected"); } } }
服务器handleClient.cs:
public class handleClient { TcpClient clientSocket; public void startClient(TcpClient inClientSocket) { this.clientSocket = inClientSocket; Thread ctThread = new Thread(Chat); ctThread.Start(); } private void Chat() { BinaryReader reader = new BinaryReader(clientSocket.GetStream()); try { while (true) { string message = reader.ReadString(); foreach (var client in Program.GetClients()) { BinaryWriter writer = new BinaryWriter(client.GetStream()); writer.Write(message); } } } catch (EndOfStreamException) { Console.WriteLine($"client disconnecting: {clientSocket.Client.RemoteEndPoint}"); clientSocket.Client.Shutdown(SocketShutdown.Both); } catch (IOException e) { Console.WriteLine($"IOException reading from {clientSocket.Client.RemoteEndPoint}: {e.Message}"); } clientSocket.Close(); Program.RemoveClient(clientSocket); Console.WriteLine($"{Program.GetClientCount()} clients connected"); } }
客户计划.s:
class Program { private static readonly object _lock = new object(); private static bool _closed; public static void Write(TcpClient client) { try { string str; SocketShutdown reason = SocketShutdown.Send; while ((str = Console.ReadLine()) != "") { lock (_lock) { BinaryWriter writer = new BinaryWriter(client.GetStream()); writer.Write(str); if (_closed) { // Remote endpoint already said they are done sending, // so we're done with both sending and receiving. reason = SocketShutdown.Both; break; } } } client.Client.Shutdown(reason); } catch (IOException e) { Console.WriteLine($"IOException writing to socket: {e.Message}"); } } public static void Read(TcpClient client) { try { while (true) { try { BinaryReader reader = new BinaryReader(client.GetStream()); Console.WriteLine(reader.ReadString()); } catch (EndOfStreamException) { lock (_lock) { _closed = true; return; } } } } catch (IOException e) { Console.WriteLine($"IOException reading from socket: {e.Message}"); } } static void Main(string[] args) { TcpClient client = new TcpClient("127.0.0.1", 14000); Thread writeThread = new Thread(() => Write(client)); Thread readThread = new Thread(() => Read(client)); writeThread.Start(); readThread.Start(); writeThread.Join(); readThread.Join(); client.Close(); Console.WriteLine("client exiting"); } }
请注意,在大多数情况下,我没有解决您在代码中使用的不一致和非常规命名。 唯一的例外是客户端代码中的线程变量,因为我真的不喜欢与类型名称完全匹配的大写局部变量。
您还有其他一些问题,上面的代码修订版没有解决这些问题。 这些包括:
- 你正在使用
BinaryReader
。 这在很多方面都是一个烦人的课程。 我建议,特别是对于你只是处理文本的聊天服务器场景,你切换到使用StreamReader
/StreamWriter
。 - 关注点之间存在不正确的耦合/分离。 您的
Program
类具有服务器代码,服务器代码知道Program
类。 将服务器和客户端实现封装到它们自己的类中,与程序的主要入口点分开,并进一步将顶级服务器代码与每个客户端数据结构分离(使用C#的event
允许顶级服务器代码被通知重要事件,例如需要从列表中删除客户端,而不必让每个客户端数据结构实际知道顶级服务器对象,更别关注它客户名单)。 - 您应该提供一种机制来正常关闭服务器。
通常情况下,我会说这些都超出了这样的答案范围,这已经很长了。 我已经解决了你的代码中的直接问题,然后是一些,这在名义上是足够的。
但是,我一直想写几年前我写的基本网络编程示例的更新版本,作为一种“中间”示例,添加多个客户端支持,异步操作和使用最新的C#function(像async
/ await
)。 所以,我继续前进并花了一些时间来做到这一点。 我想我最终会把它发布到我的博客上……这是另一个项目。 与此同时,这是代码(注意这是一个完全从头开始的例子……这样做比尝试重新编写代码更有意义)…
此实现的大部分工作都在服务器和客户端共享的单个类中:
/// /// Represents a remote end-point for the chat server and clients /// public sealed class ConnectedEndPoint : IDisposable { private readonly object _lock = new object(); private readonly Socket _socket; private readonly StreamReader _reader; private readonly StreamWriter _writer; private bool _closing; /// /// Gets the address of the connected remote end-point /// public IPEndPoint RemoteEndPoint { get { return (IPEndPoint)_socket.RemoteEndPoint; } } /// /// Gets a representing the on-going read operation of the connection /// public Task ReadTask { get; } /// /// Connect to an existing remote end-point (server) and return the /// object representing the new connection /// /// The address of the remote end-point to connect to /// The callback which will be called when a line of text is read from the newly-created connection /// public static ConnectedEndPoint Connect(IPEndPoint remoteEndPoint, Action readCallback) { Socket socket = new Socket(SocketType.Stream, ProtocolType.Tcp); socket.Connect(remoteEndPoint); return new ConnectedEndPoint(socket, readCallback); } /// /// Asynchronously accept a new connection from a remote end-point /// /// The listening which will accept the connection /// The callback which will be called when a line of text is read from the newly-created connection /// public static async Task AcceptAsync(Socket listener, Action readCallback) { Socket clientSocket = await Task.Factory.FromAsync(listener.BeginAccept, listener.EndAccept, null); return new ConnectedEndPoint(clientSocket, readCallback); } /// /// Write a line of text to the connection, sending it to the remote end-point /// /// The line of text to write public void WriteLine(string text) { lock (_lock) { if (!_closing) { _writer.WriteLine(text); _writer.Flush(); } } } /// /// Initiates a graceful closure of the connection /// public void Shutdown() { _Shutdown(SocketShutdown.Send); } /// /// Implements /// public void Dispose() { _reader.Dispose(); _writer.Dispose(); _socket.Close(); } /// /// Constructor. Private -- use one of the factory methods to create new connections. /// /// The for the new connection /// The callback for reading lines on the new connection private ConnectedEndPoint(Socket socket, Action readCallback) { _socket = socket; Stream stream = new NetworkStream(_socket); _reader = new StreamReader(stream, Encoding.UTF8, false, 1024, true); _writer = new StreamWriter(stream, Encoding.UTF8, 1024, true); ReadTask = _ConsumeSocketAsync(readCallback); } private void _Shutdown(SocketShutdown reason) { lock (_lock) { if (!_closing) { _socket.Shutdown(reason); _closing = true; } } } private async Task _ConsumeSocketAsync(Action callback) { string line; while ((line = await _reader.ReadLineAsync()) != null) { callback(this, line); } _Shutdown(SocketShutdown.Both); } }
客户端程序将直接使用该类。 服务器端封装在另一个类中,在上面的相同DLL中找到:
/// /// Event arguments for the event /// public class StatusEventArgs : EventArgs { /// /// Gets the status text /// public string StatusText { get; } /// /// Constructor /// /// The status text public StatusEventArgs(string statusText) { StatusText = statusText; } } /// /// A server implementing a simple line-based chat server /// public class ChatServer { private readonly object _lock = new object(); private readonly Socket _listener; private readonly List _clients = new List (); private bool _closing; /// /// Gets a task representing the listening state of the servdere /// public Task ListenTask { get; } /// /// Raised when the server has status to report /// public event EventHandler Status; /// /// Constructor /// /// The port number the server should listen on public ChatServer(int port) { _listener = new Socket(SocketType.Stream, ProtocolType.Tcp); _listener.Bind(new IPEndPoint(IPAddress.Any, port)); _listener.Listen(int.MaxValue); ListenTask = _ListenAsync(); } /// /// Initiates a shutdown of the chat server. /// /// This method closes the listening socket, which will subsequently /// cause the listening task to inform any connected clients that the server /// is shutting down, and to wait for the connected clients to finish a graceful /// closure of their connections. /// public void Shutdown() { _listener.Close(); } private async Task _ListenAsync() { try { while (true) { ConnectedEndPoint client = await ConnectedEndPoint.AcceptAsync(_listener, _ClientReadLine); _AddClient(client); _CleanupClientAsync(client); } } catch (ObjectDisposedException) { _OnStatus("Server's listening socket closed"); } catch (IOException e) { _OnStatus($"Listening socket IOException: {e.Message}"); } await _CleanupServerAsync(); } private async Task _CleanupServerAsync() { ConnectedEndPoint[] clients; lock (_lock) { _closing = true; clients = _clients.ToArray(); } foreach (ConnectedEndPoint client in clients) { try { client.WriteLine("Chat server is shutting down"); } catch (IOException e) { _OnClientException(client, e.Message); } client.Shutdown(); } // Clients are expected to participate in graceful closure. If they do, // this will complete when all clients have acknowledged the shutdown. // In a real-world program, may be a good idea to include a timeout in // case of network issues or misbehaving/crashed clients. Implementing // the timeout is beyond the scope of this proof-of-concept demo code. try { await Task.WhenAll(clients.Select(c => c.ReadTask)); } catch (AggregateException) { // Actual exception for each client will have already // been reported by _CleanupClientAsync() } } // Top-level "clean-up" method, which will observe and report all exceptions // In real-world code, would probably want to simply log any unexpected exceptions // to a log file and then exit the process. Here, we just exit after reporting // exception info to caller. In either case, there's no need to observe a Task from // this method, and async void simplifies the call (no need to receive and then ignore // the Task object just to keep the compiler quiet). private async void _CleanupClientAsync(ConnectedEndPoint client) { try { await client.ReadTask; } catch (IOException e) { _OnClientException(client, e.Message); } catch (Exception e) { // Unexpected exceptions are programmer-error. They could be anything, and leave // the program in an unknown, possibly corrupt state. The only reasonable disposition // is to log, then exit. // // Full stack-trace, because who knows what this exception was. Will need the // stack-trace to do any diagnostic work. _OnStatus($"Unexpected client connection exception. {e}"); Environment.Exit(1); } finally { _RemoveClient(client); client.Dispose(); } } private void _ClientReadLine(ConnectedEndPoint readClient, string text) { _OnStatus($"Client {readClient.RemoteEndPoint}: \"{text}\""); lock (_lock) { if (_closing) { return; } text = $"{readClient.RemoteEndPoint}: {text}"; foreach (ConnectedEndPoint client in _clients.Where(c => c != readClient)) { try { client.WriteLine(text); } catch (IOException e) { _OnClientException(client, e.Message); } } } } private void _AddClient(ConnectedEndPoint client) { lock (_lock) { _clients.Add(client); _OnStatus($"added client {client.RemoteEndPoint} -- {_clients.Count} clients connected"); } } private void _RemoveClient(ConnectedEndPoint client) { lock (_lock) { _clients.Remove(client); _OnStatus($"removed client {client.RemoteEndPoint} -- {_clients.Count} clients connected"); } } private void _OnStatus(string statusText) { Status?.Invoke(this, new StatusEventArgs(statusText)); } private void _OnClientException(ConnectedEndPoint client, string message) { _OnStatus($"Client {client.RemoteEndPoint} IOException: {message}"); } }
在大多数情况下,这就是你所需要的一切。 上面的DLL代码(在我的示例中)由两个不同的程序(服务器和客户端)引用。
这是服务器:
class Program { private const int _kportNumber = 5678; static void Main(string[] args) { ChatServer server = new ChatServer(_kportNumber); server.Status += (s, e) => WriteLine(e.StatusText); Task serverTask = _WaitForServer(server); WriteLine("Press return to shutdown server..."); ReadLine(); server.Shutdown(); serverTask.Wait(); } private static async Task _WaitForServer(ChatServer server) { try { await server.ListenTask; } catch (Exception e) { WriteLine($"Server exception: {e}"); } } }
这是客户:
class Program { private const int _kportNumber = 5678; static void Main(string[] args) { IPEndPoint remoteEndPoint = new IPEndPoint(IPAddress.Loopback, _kportNumber); ConnectedEndPoint server = ConnectedEndPoint.Connect(remoteEndPoint, (c, s) => WriteLine(s)); _StartUserInput(server); _SafeWaitOnServerRead(server).Wait(); } private static void _StartUserInput(ConnectedEndPoint server) { // Get user input in a new thread, so main thread can handle waiting // on connection. new Thread(() => { try { string line; while ((line = ReadLine()) != "") { server.WriteLine(line); } server.Shutdown(); } catch (IOException e) { WriteLine($"Server {server.RemoteEndPoint} IOException: {e.Message}"); } catch (Exception e) { WriteLine($"Unexpected server exception: {e}"); Environment.Exit(1); } }) { // Setting IsBackground means this thread won't keep the // process alive. So, if the connection is closed by the server, // the main thread can exit and the process as a whole will still // be able to exit. IsBackground = true }.Start(); } private static async Task _SafeWaitOnServerRead(ConnectedEndPoint server) { try { await server.ReadTask; } catch (IOException e) { WriteLine($"Server {server.RemoteEndPoint} IOException: {e.Message}"); } catch (Exception e) { // Should never happen. It's a bug in this code if it does. WriteLine($"Unexpected server exception: {e}"); } } }
在我看来,上面提到的最重要的事情之一是ConnectedEndPoint
和ChatServer
类对使用它们的类没有依赖性。 通过使用回调委托和事件,依赖于这些类的代码能够双向交互,而这些支持类不必知道代码所在的类型(参见“控制反转”,这是一个变体)的)。
您可以越多地使代码关系看起来像只有单向引用的树,编写代码就越容易,以后再进行维护。
注意:为了便于说明,我使用了事件和回调代理。 这两种方法都可以自行运行。 主要的权衡是复杂性与灵活性。 使用事件使代码更加灵活 – 可以根据需要添加和删除事件处理程序 – 但是如果使用带有sender
和EventArgs
参数的方法签名的.NET约定来实现事件,则它比某些程序更“重量级”。在创建有问题的对象时只传递一个简单的回调委托。 我在代码中给出了每个示例,您可以在哪些情况下决定您喜欢哪种方法。
您还会注意到,上面大量使用了C#的异步function。 起初,这可能会使代码看起来更难阅读。 但事实上,使用这些function实际上要比使用旧的BeginXXX()
/ EndXXX()
方法更加容易,或者,天堂禁止,为每个连接专用一个线程( 非常扩展)客户数量上升时表现不佳)。 通过这种方式,绝对值得习惯于考虑本质上异步的操作,例如网络I / O.