C#:异步NamedPipeServerStream管道正在关闭exception

我之前关于同一主题的问题: C#:异步NamedPipeServerStream理解现在我有下一个:

private void StartListeningPipes() { try { isPipeWorking = true; namedPipeServerStream = new NamedPipeServerStream(PIPENAME, PipeDirection.InOut, 1, PipeTransmissionMode.Byte, PipeOptions.Asynchronous, BUFFERSIZE, BUFFERSIZE); Console.Write("Waiting for client connection..."); while(isPipeWorking) { IAsyncResult asyncResult = namedPipeServerStream.BeginWaitForConnection(this.WaitForConnectionAsyncCallback, null); Thread.Sleep(3*1000); } } //// Catch the IOException that is raised if the pipe is broken or disconnected. catch (IOException e) { Console.WriteLine("IOException: {0}. Restart pipe server...", e.Message); StopListeningPipes(); StartListeningPipes(); } //// Catch ObjectDisposedException if server was stopped. Then do nothing. catch (ObjectDisposedException) { } } private void WaitForConnectionAsyncCallback(IAsyncResult result) { try { namedPipeServerStream.EndWaitForConnection(result); Console.WriteLine("Client connected."); namedPipeServerStream.WaitForPipeDrain(); byte[] buff = new byte[BUFFERSIZE]; namedPipeServerStream.Read(buff, 0, BUFFERSIZE); string recStr = TrimNulls(buff); Array.Clear(buff, 0, buff.Length); Console.WriteLine(); Console.WriteLine("'"+recStr+"'"); } catch (Exception e) { Console.WriteLine("Error: " + e.Message); } } 

但我得到了

每次收到客户端的消息时, The pipe is being closed Exception

为什么?

我的客户:

  using (NamedPipeClientStream pipeStream = new NamedPipeClientStream(General.PIPENAME)) { try { byte[] bytes = General.Iso88591Encoding.GetBytes(sendingMessage); pipeStream.Write(bytes, 0, bytes.Length); pipeStream.Flush(); pipeStream.WaitForPipeDrain(); } catch (TimeoutException) { Console.WriteLine("Timeout error!"); } catch (Exception e) { Console.WriteLine(string.Format("Error! ", e.Message)); } } 

目前的最终代码是:

 ///  /// Create new NamedPipeServerStream for listening to pipe client connection ///  private void ListenForPipeClients() { if (!this.isListeningToClients) return; try { PipeSecurity ps = new PipeSecurity(); PipeAccessRule par = new PipeAccessRule("Everyone", PipeAccessRights.ReadWrite, System.Security.AccessControl.AccessControlType.Allow); ps.AddAccessRule(par); pipeClientConnection = new NamedPipeServerStream(General.PIPENAME, PipeDirection.InOut, 1, PipeTransmissionMode.Byte, PipeOptions.Asynchronous, General.BUFFERSIZE, General.BUFFERSIZE, ps); Console.Write("Waiting for client connection..."); /*namedPipeServerStream.WaitForConnection(); OnPipeConnected(namedPipeServerStream);*/ IAsyncResult result = pipeClientConnection.BeginWaitForConnection(OnPipeConnected, pipeClientConnection); } catch (ObjectDisposedException) { //// Catch ObjectDisposedException if server was stopped. Then do nothing. } catch (Exception e) { Console.WriteLine("Error occures: {0}. Restart pipe server...", e.Message); this.logger.Add(LogLevel.Warning, string.Format("Error occures: {0}. Restart pipe server...", e.Message)); ListenForPipeClients(); } } ///  /// Async callback on client connected action ///  /// Async result private void OnPipeConnected(IAsyncResult asyncResult) { using (var conn = (NamedPipeServerStream)asyncResult.AsyncState) { try { conn.EndWaitForConnection(asyncResult); Console.WriteLine("Client connected."); PipeClientConnection clientConnection = new PipeClientConnection(conn, notifierSenderCache, defaultStorageTime); } catch (Exception e) { Console.WriteLine(e.Message); this.logger.Add(LogLevel.Warning, e.Message); } } ListenForPipeClients(); } 

您似乎需要为每个客户端单独使用NamedPipeServerStream 。 (请注意,我不是那个发现它的人,请参阅其他答案。)我认为工作服务器端看起来像这样(草稿代码):

 while(this.isServerRunning) { var pipeClientConnection = new NamedPipeServerStream(...); try { pipeClientConnection.WaitForConnection(); } catch(...) { ... continue; } ThreadPool.QueueUserWorkItem(state => { // we need a separate variable here, so as not to make the lambda capture the pipeClientConnection variable, which is not recommended in multi-threaded scenarios using(var pipeClientConn = (NamedPipeServerStream)state) { // do stuff ... } }, pipeClientConnection); } 

作为旁注,正如在你的问题的评论中指出的那样,你浪费内存通过在一个循环中调用BeginWaitForConnection每隔3秒启动一个新的异步调用(唯一不会浪费内存的情况是新的连接以小于3秒的间隔进行,但我怀疑你是否可以肯定地知道这一点。 您可以看到,无论最后一个是否仍在等待或已完成,您基本上每3秒启动一次新的异步调用。 此外,它 – 再次 – 没有考虑到每个客户端需要一个单独的NamedPipeServerStream

要解决此问题,您需要消除循环,并使用回调方法“链接”BeginWaitForConnection调用。 这是一种类似的模式,在使用.NET时,您会经常在异步I / O中看到这种模式。 代码草案:

 private void StartListeningPipes() { if(!this.isServerRunning) { return; } var pipeClientConnection = new NamedPipeServerStream(...); try { pipeClientConnection.BeginWaitForConnection(asyncResult => { // note that the body of the lambda is not part of the outer try... catch block! using(var conn = (NamedPipeServerStream)asyncResult.AsyncState) { try { conn.EndWaitForConnection(asyncResult); } catch(...) { ... } // we have a connection established, time to wait for new ones while this thread does its business with the client // this may look like a recursive call, but it is not: remember, we're in a lambda expression // if this bothers you, just export the lambda into a named private method, like you did in your question StartListeningPipes(); // do business with the client conn.WaitForPipeDrain(); ... } }, pipeClientConnection); } catch(...) { ... } } 

控制流程将是这样的:

  • [主线程] StartListeningPipes():创建NamedPipeServerStream,启动BeginWaitForConnection()
  • [threadpool thread 1] client#1 connected,BeginWaitForConnection()callback:EndWaitForConnection()then StartListeningPipes()
  • [threadpool thread 1] StartListeningPipes():创建新的NamedPipeServerStream,BeginWaitForConnection()调用
  • [threadpool thread 1]回到BeginWaitForConnection()回调:与连接的客户端开始业务(#1)
  • [threadpool thread 2]客户端#2连接,BeginWaitForConnection()回调:…

我认为这比使用阻塞I / O要困难得多 – 事实上,我不太确定我做对了,如果你发现任何错误请指出 – 而且它也更令人困惑。

要在任一示例中暂停服务器,您显然会将this.isServerRunning标志设置为false

好。 愚弄我 每个客户端应该有一个NamedPipeServerStream。 因此,如果异步操作已完成,则必须重新创建NamedPipeServerStream。 感谢C#中的multithreadingNamePipeServer

应该:

 while(isPipeWorking) { IAsyncResult asyncResult = namedPipeServerStream.BeginWaitForConnection(this.WaitForConnectionAsyncCallback, null); Thread.Sleep(3*1000); if (asyncResult.IsCompleted) { RestartPipeServer(); break; } }