C#:实现NetworkStream.Peek?

目前,C#中没有NetworkStream.Peek方法。 实现像NetworkStream.ReadByte这样的函数的最佳方法是什么,除了返回的byte实际上没有从Stream删除?

如果您不需要实际检索字节,则可以引用DataAvailable属性。

否则,您可以使用StreamReader将其包装并调用其Peek方法。

请注意,由于延迟问题,这些都不能特别可靠地从网络流中读取。 您偷看之后的瞬间数据可能变得可用(存在于读缓冲区中)。

我不确定你打算用它做什么,但是NetworkStream上的Read方法是一个阻塞调用,所以你不需要检查状态,即使你是在接收块。 如果您在从流中读取时尝试保持应用程序响应,则应使用线程或异步调用来接收数据。

编辑:根据这篇文章 , StreamReader.PeekNetworkStream上是错误的,或者至少有未记录的行为,所以如果你选择走那条路线要小心。


更新 – 对评论的回复

对实际流本身进行“窥视”的概念实际上是不可能的; 它只是一个流,一旦收到字节,它就不再在流上了。 有些流支持搜索,因此您可以在技术上重新读取该字节,但NetworkStream不是其中之一。

偷看只适用于将流读入缓冲区的情况; 一旦数据在缓冲区中,那么偷看很容易,因为你只需要检查缓冲区中当前位置的内容。 这就是StreamReader能够做到这一点的原因; 没有Stream类通常会有自己的Peek方法。

现在,具体来说,对于这个问题,我怀疑这是否真的是正确的答案。 我理解动态选择处理流的方法的想法,但你真的需要在原始流上执行此操作吗? 您是否可以先将流读入字节数组,或者甚至将其复制到MemoryStream ,然后从该点开始处理它?

我看到的主要问题是,如果在从网络流中读取时发生了不好的事情,那么数据就会消失。 但如果您先将其读入临时位置,则可以对其进行调试。 您可以找出数据是什么以及为什么尝试处理数据的对象在中途失败。

通常,您想要对NetworkStream执行的第一件事是将其读入本地缓冲区。 我能想到不这样做的唯一原因是,如果你正在读取大量数据 – 即便如此,如果它不适合内存,我可能会考虑使用文件系统作为中间缓冲区。

我不知道你的确切要求,但从我到目前为止所学到的,我的建议是:不要试图直接从NetworkStream处理你的数据,除非有令人信服的理由这样做。 考虑首先将数据读入内存或磁盘,然后处理副本。

我遇到了同样的’偷看魔术数字,然后决定将流发送到’要求的流处理器,不幸的是,我无法摆脱那个问题 – 正如对Aaronaught的答案的评论所建议的那样 – 通过传递已经消耗的字节在单独的参数中进入流处理方法,因为那些方法是给定的,他们期望System.IO.Stream而不是别的。

我通过创建一个包含Stream的或多或少通用的PeekableStream类来解决这个问题。 它适用于NetworkStreams,但也适用于任何其他Stream,只要你Stream.CanRead它。


编辑

或者,你可以使用全新的ReadSeekableStream来做

 var readSeekableStream = new ReadSeekableStream(networkStream, /* >= */ count); ... readSeekableStream.Read(..., count); readSeekableStream.Seek(-count, SeekOrigin.Current); 

无论如何, PeekableStream

 ///  /// PeekableStream wraps a Stream and can be used to peek ahead in the underlying stream, /// without consuming the bytes. In other words, doing Peek() will allow you to look ahead in the stream, /// but it won't affect the result of subsequent Read() calls. /// /// This is sometimes necessary, eg for peeking at the magic number of a stream of bytes and decide which /// stream processor to hand over the stream. ///  public class PeekableStream : Stream { private readonly Stream underlyingStream; private readonly byte[] lookAheadBuffer; private int lookAheadIndex; public PeekableStream(Stream underlyingStream, int maxPeekBytes) { this.underlyingStream = underlyingStream; lookAheadBuffer = new byte[maxPeekBytes]; } protected override void Dispose(bool disposing) { if (disposing) underlyingStream.Dispose(); base.Dispose(disposing); } ///  /// Peeks at a maximum of count bytes, or less if the stream ends before that number of bytes can be read. /// /// Calls to this method do not influence subsequent calls to Read() and Peek(). /// /// Please note that this method will always peek count bytes unless the end of the stream is reached before that - in contrast to the Read() /// method, which might read less than count bytes, even though the end of the stream has not been reached. ///  /// An array of bytes. When this method returns, the buffer contains the specified byte array with the values between offset and /// (offset + number-of-peeked-bytes - 1) replaced by the bytes peeked from the current source. /// The zero-based byte offset in buffer at which to begin storing the data peeked from the current stream. /// The maximum number of bytes to be peeked from the current stream. /// The total number of bytes peeked into the buffer. If it is less than the number of bytes requested then the end of the stream has been reached. public virtual int Peek(byte[] buffer, int offset, int count) { if (count > lookAheadBuffer.Length) throw new ArgumentOutOfRangeException("count", "must be smaller than peekable size, which is " + lookAheadBuffer.Length); while (lookAheadIndex < count) { int bytesRead = underlyingStream.Read(lookAheadBuffer, lookAheadIndex, count - lookAheadIndex); if (bytesRead == 0) // end of stream reached break; lookAheadIndex += bytesRead; } int peeked = Math.Min(count, lookAheadIndex); Array.Copy(lookAheadBuffer, 0, buffer, offset, peeked); return peeked; } public override bool CanRead { get { return true; } } public override long Position { get { return underlyingStream.Position - lookAheadIndex; } set { underlyingStream.Position = value; lookAheadIndex = 0; // this needs to be done AFTER the call to underlyingStream.Position, as that might throw NotSupportedException, // in which case we don't want to change the lookAhead status } } public override int Read(byte[] buffer, int offset, int count) { int bytesTakenFromLookAheadBuffer = 0; if (count > 0 && lookAheadIndex > 0) { bytesTakenFromLookAheadBuffer = Math.Min(count, lookAheadIndex); Array.Copy(lookAheadBuffer, 0, buffer, offset, bytesTakenFromLookAheadBuffer); count -= bytesTakenFromLookAheadBuffer; offset += bytesTakenFromLookAheadBuffer; lookAheadIndex -= bytesTakenFromLookAheadBuffer; if (lookAheadIndex > 0) // move remaining bytes in lookAheadBuffer to front // copying into same array should be fine, according to http://msdn.microsoft.com/en-us/library/z50k9bft(v=VS.90).aspx : // "If sourceArray and destinationArray overlap, this method behaves as if the original values of sourceArray were preserved // in a temporary location before destinationArray is overwritten." Array.Copy(lookAheadBuffer, lookAheadBuffer.Length - bytesTakenFromLookAheadBuffer + 1, lookAheadBuffer, 0, lookAheadIndex); } return count > 0 ? bytesTakenFromLookAheadBuffer + underlyingStream.Read(buffer, offset, count) : bytesTakenFromLookAheadBuffer; } public override int ReadByte() { if (lookAheadIndex > 0) { lookAheadIndex--; byte firstByte = lookAheadBuffer[0]; if (lookAheadIndex > 0) // move remaining bytes in lookAheadBuffer to front Array.Copy(lookAheadBuffer, 1, lookAheadBuffer, 0, lookAheadIndex); return firstByte; } else { return underlyingStream.ReadByte(); } } public override long Seek(long offset, SeekOrigin origin) { long ret = underlyingStream.Seek(offset, origin); lookAheadIndex = 0; // this needs to be done AFTER the call to underlyingStream.Seek(), as that might throw NotSupportedException, // in which case we don't want to change the lookAhead status return ret; } // from here on, only simple delegations to underlyingStream public override bool CanSeek { get { return underlyingStream.CanSeek; } } public override bool CanWrite { get { return underlyingStream.CanWrite; } } public override bool CanTimeout { get { return underlyingStream.CanTimeout; } } public override int ReadTimeout { get { return underlyingStream.ReadTimeout; } set { underlyingStream.ReadTimeout = value; } } public override int WriteTimeout { get { return underlyingStream.WriteTimeout; } set { underlyingStream.WriteTimeout = value; } } public override void Flush() { underlyingStream.Flush(); } public override long Length { get { return underlyingStream.Length; } } public override void SetLength(long value) { underlyingStream.SetLength(value); } public override void Write(byte[] buffer, int offset, int count) { underlyingStream.Write(buffer, offset, count); } public override void WriteByte(byte value) { underlyingStream.WriteByte(value); } } 

如果您有权访问Socket对象,则可以尝试使用Receive方法 ,并传递SocketFlags.Peek 。 这类似于可以传递给BSD套接字或Winsock中的recv调用的MSG_PEEK标志。

这是一个非常简单的PeekStream实现,它允许您仅在流的开头查看一定数量的字节(而不是能够随时查看)。 隐藏的字节作为Stream本身返回,以最小化对现有代码的更改。

这是你如何使用它:

 Stream nonSeekableStream = ...; PeekStream peekStream = new PeekStream(nonSeekableStream, 30); // Peek max 30 bytes Stream initialBytesStream = peekStream.GetInitialBytesStream(); ParseHeaders(initialBytesStream); // Work on initial bytes of nonSeekableStream peekStream.Read(...) // Read normally, the read will start from the beginning 

GetInitialBytesStream()返回一个可搜索的流,该流包含最多peekSize底层流的初始字节(如果流比peekSize短,则peekSize )。

由于它的简单性,读取PeekStream应该只是略微慢(如果有的话)比直接读取底层流。

 public class PeekStream : Stream { private Stream m_stream; private byte[] m_buffer; private int m_start; private int m_end; public PeekStream(Stream stream, int peekSize) { if (stream == null) { throw new ArgumentNullException("stream"); } if (!stream.CanRead) { throw new ArgumentException("Stream is not readable."); } if (peekSize < 0) { throw new ArgumentOutOfRangeException("peekSize"); } m_stream = stream; m_buffer = new byte[peekSize]; m_end = stream.Read(m_buffer, 0, peekSize); } public override bool CanRead { get { return true; } } public override bool CanWrite { get { return false; } } public override bool CanSeek { get { return false; } } public override long Length { get { throw new NotSupportedException(); } } public override long Position { get { throw new NotSupportedException(); } set { throw new NotSupportedException(); } } public MemoryStream GetInitialBytesStream() { return new MemoryStream(m_buffer, 0, m_end, false); } public override long Seek(long offset, SeekOrigin origin) { throw new NotSupportedException(); } public override void SetLength(long value) { throw new NotSupportedException(); } public override int Read(byte[] buffer, int offset, int count) { // Validate arguments if (buffer == null) { throw new ArgumentNullException("buffer"); } if (offset < 0) { throw new ArgumentOutOfRangeException("offset"); } if (offset + count > buffer.Length) { throw new ArgumentOutOfRangeException("count"); } int totalRead = 0; // Read from buffer if (m_start < m_end) { int toRead = Math.Min(m_end - m_start, count); Array.Copy(m_buffer, m_start, buffer, offset, toRead); m_start += toRead; offset += toRead; count -= toRead; totalRead += toRead; } // Read from stream if (count > 0) { totalRead += m_stream.Read(buffer, offset, count); } // Return total bytes read return totalRead; } public override void Write(byte[] buffer, int offset, int count) { throw new NotImplementedException(); } public override int ReadByte() { if (m_start < m_end) { return m_buffer[m_start++]; } else { return m_stream.ReadByte(); } } public override void Flush() { m_stream.Flush(); } protected override void Dispose(bool disposing) { if (disposing) { m_stream.Dispose(); } base.Dispose(disposing); } } 

免责声明:上面的PeekStream取自一个工作程序,但它没有经过全面测试,因此可能包含错误。 它适用于我,但你可能会发现一些失败的极端情况。