从流中不断读取?

我有一个Stream对象偶尔会获得一些数据,但是间隔时间不可预测。 流上出现的消息是明确定义的,并提前声明其有效负载的大小(大小是每个消息的前两个字节中包含的16位整数)。

我想要一个StreamWatcher类来检测Stream何时有一些数据。 一旦它发生,我想要引发一个事件,以便订阅的StreamProcessor实例可以处理新消息。

这可以通过C#事件完成,而无需直接使用Threads吗? 看起来它应该是直截了当的,但我无法完全理解设计这个问题的正确方法。

当你说不直接使用线程时,我假设你仍然希望通过异步调用间接使用它们,否则这将不是非常有用。

您需要做的就是包装Stream的异步方法并将结果存储在缓冲区中。 首先,让我们定义规范的事件部分:

 public delegate void MessageAvailableEventHandler(object sender, MessageAvailableEventArgs e); public class MessageAvailableEventArgs : EventArgs { public MessageAvailableEventArgs(int messageSize) : base() { this.MessageSize = messageSize; } public int MessageSize { get; private set; } } 

现在,异步读取流中的一个16位整数,并在准备就绪时报告:

 public class StreamWatcher { private readonly Stream stream; private byte[] sizeBuffer = new byte[2]; public StreamWatcher(Stream stream) { if (stream == null) throw new ArgumentNullException("stream"); this.stream = stream; WatchNext(); } protected void OnMessageAvailable(MessageAvailableEventArgs e) { var handler = MessageAvailable; if (handler != null) handler(this, e); } protected void WatchNext() { stream.BeginRead(sizeBuffer, 0, 2, new AsyncCallback(ReadCallback), null); } private void ReadCallback(IAsyncResult ar) { int bytesRead = stream.EndRead(ar); if (bytesRead != 2) throw new InvalidOperationException("Invalid message header."); int messageSize = sizeBuffer[1] << 8 + sizeBuffer[0]; OnMessageAvailable(new MessageAvailableEventArgs(messageSize)); WatchNext(); } public event MessageAvailableEventHandler MessageAvailable; } 

我认为就是这样。 这假设处理消息的任何类也可以访问Stream ,并准备基于事件中的消息大小同步或异步地读取它。 如果您希望观察者类实际读取整个消息,那么您将不得不添加更多代码来执行此操作。

通常的方法是使用Stream公开的.NET异步编程模式 。 本质上,您通过调用Stream.BeginRead开始异步读取,传递一个byte[]缓冲区和一个回调方法,该方法将在从流中读取数据时调用。 在回调方法中,您调用Stream.EndReadStream.EndRead传递给回调的IAsncResult参数。 EndRead的返回值告诉您读入缓冲区的字节数。

一旦以这种方式收到前几个字节,就可以再次调用BeginRead ,等待剩下的消息(如果第一次没有获得足够的数据)。 一旦你收到了整条信息,就可以举起活动。

是不是像在单独的线程中使用同步Stream.Read()方法一样使用Stream.BeginRead()?

是的,这可以做到。 将非阻塞Stream.BeginRead方法与AsyncCallback一起使用。 当数据可用时,将异步调用回调。 在回调中,调用Stream.EndRead来获取数据,并再次调用Stream.BeginRead以获取下一个数据块。 缓冲输入数据的字节数组,该数组足以容纳消息。 一旦字节数组已满(可能需要多次回调调用),请引发事件。 然后读取下一个消息大小,创建一个新的缓冲区,重复,完成。