在C#中缓冲字节数据

我的应用程序从TCP套接字读取字节并需要缓冲它们,以便我可以稍后从中提取消息。 由于TCP的性质,我可能在一次读取中得到部分或多个消息,因此在每次读取之后,我想检查缓冲区并提取尽可能多的完整消息。

因此,我想要一个允许我执行以下操作的课程:

  • 将任意byte []数据附加到它
  • 在不消费内容的情况下检查内容,特别是检查内容量并搜索某个字节或字节的存在
  • 以byte []的forms提取和使用部分数据,而将其余部分留在那里以备将来读取

我希望我想要的东西可以用.NET库中的一个或多个现有类完成,但我不确定哪些类。 System.IO.MemoryStream看起来接近我想要的,但是(a)不清楚它是否适合用作缓冲区(读取数据是否从容量中删除?)和(b)读写似乎发生在同一个地方 – “流的当前位置是下一次读取或写入操作可能发生的位置。” – 这不是我想要的。 我需要写到最后并从前面阅读。

只需使用一个大字节数组和Array.Copy – 它应该可以解决问题。 如果没有,请使用List

如果你使用数组,你必须自己实现一个索引(你在那里复制其他数据)(同样用于检查内容大小),但它很简单。

如果您感兴趣:这是一个“循环缓冲区”的简单实现。 测试应该运行(我在它上面进行了几次unit testing,但没有检查所有关键路径):

 public class ReadWriteBuffer { private readonly byte[] _buffer; private int _startIndex, _endIndex; public ReadWriteBuffer(int capacity) { _buffer = new byte[capacity]; } public int Count { get { if (_endIndex > _startIndex) return _endIndex - _startIndex; if (_endIndex < _startIndex) return (_buffer.Length - _startIndex) + _endIndex; return 0; } } public void Write(byte[] data) { if (Count + data.Length > _buffer.Length) throw new Exception("buffer overflow"); if (_endIndex + data.Length >= _buffer.Length) { var endLen = _buffer.Length - _endIndex; var remainingLen = data.Length - endLen; Array.Copy(data, 0, _buffer, _endIndex, endLen); Array.Copy(data, endLen, _buffer, 0, remainingLen); _endIndex = remainingLen; } else { Array.Copy(data, 0, _buffer, _endIndex, data.Length); _endIndex += data.Length; } } public byte[] Read(int len, bool keepData = false) { if (len > Count) throw new Exception("not enough data in buffer"); var result = new byte[len]; if (_startIndex + len < _buffer.Length) { Array.Copy(_buffer, _startIndex, result, 0, len); if (!keepData) _startIndex += len; return result; } else { var endLen = _buffer.Length - _startIndex; var remainingLen = len - endLen; Array.Copy(_buffer, _startIndex, result, 0, endLen); Array.Copy(_buffer, 0, result, endLen, remainingLen); if (!keepData) _startIndex = remainingLen; return result; } } public byte this[int index] { get { if (index >= Count) throw new ArgumentOutOfRangeException(); return _buffer[(_startIndex + index) % _buffer.Length]; } } public IEnumerable Bytes { get { for (var i = 0; i < Count; i++) yield return _buffer[(_startIndex + i) % _buffer.Length]; } } } 

请注意:代码“消耗”读取 - 如果您不希望只删除“_startIndex = ...”部分(或进行重载可选参数和检查或其他)。

我建议您使用MemoryStream ,但将其封装在另一个存储的类中:

  • MemoryStream
  • 目前的“阅读”位置
  • 目前“消费”的位置

然后它会暴露:

  • 写入:将流的位置设置为结尾,写入数据,将流的位置设置回读取位置
  • 读取:读取数据,将读取位置设置为流的位置
  • 消耗:更新消耗的位置(根据您尝试消费的方式详细说明); 如果消耗位置高于某个阈值,则将现有的缓冲数据复制到新的MemoryStream并更新所有变量。 (您可能不希望在每个使用请求上复制缓冲区。)

请注意,如果没有额外的同步,这些都不是线程安全的。

这是我刚才写的缓冲区的另一个实现

  • 可resize :允许排队数据而不抛出缓冲区溢出exception;
  • 高效 :使用单个缓冲区和Buffer.Copy操作来排队/出列数据

来到这个晚了,但后人:

当我过去这样做时,我采取了一种略微不同的方法。 如果您的消息具有固定的标头大小(告诉您正文中有多少字节),并且记住网络流已经缓冲,我将分两个阶段执行操作:

  • 在流上读取标头的字节
  • 根据标头对流体的字节进行后续读取
  • 重复

这可以利用这样一个事实 – 对于一个流 – 当你要求’n’个字节时,你永远不会得到更多回报,所以你可以忽略许多’我读过太多的opps,让我把它们放到下一次’问题。

现在这不是整个故事,公平。 我在流上有一个底层包装类来处理碎片问题(即如果要求4个字节,则在收到4个字节或流关闭之前不返回)。 但这一点相当容易。

在我看来,关键是将消息处理与流机制分离,如果您停止尝试将消息作为流中的单个ReadBytes()使用,生活变得更加简单。

[无论您的读取是阻塞还是异步(APM /等待),所有这一切都是正确的]

我认为BufferedStream是解决问题的方法。 也可以通过调用Seek来读取未读的len字节数据。

 BufferdStream buffer = new BufferedStream(tcpStream, size); // we have a buffer of size ... ... while(...) { buffer.Read(...); // do my staff // I read too much, I want to put back len bytes buffer.Seek(-len, SeekOrigin.End); // I shall come back and read later } 

成长记忆

与最初指定size BufferedStream相反, MemoryStream可以增长。

记住流数据

MemoryStream始终保存所有dara-read,而BufferedStream仅保存一段流数据。

源流与字节数组

MemoryStream允许在Write()方法中添加输入字节,将来可以是Read() 。 而BufferedSteam从构造函数中指定的另一个源流中获取输入字节。

听起来你想从套接字读入MemoryStream缓冲区,然后从缓冲区“弹出”数据并在每次遇到某个字节时重置它。 它看起来像这样:

 void ReceiveAllMessages(Action messageReceived, Socket socket) { var currentMessage = new MemoryStream(); var buffer = new byte[128]; while (true) { var read = socket.Receive(buffer, 0, buffer.Length); if (read == 0) break; // Connection closed for (var i = 0; i < read; i++) { var currentByte = buffer[i]; if (currentByte == END_OF_MESSAGE) { var message = currentMessage.ToByteArray(); messageReceived(message); currentMessage = new MemoryStream(); } else { currentMessage.Write(currentByte); } } } } 

您可以使用Stream包装ConcurrentQueue>来执行此ConcurrentQueue> (请记住,这仅使其向前)。 但是我真的不喜欢在使用它之前将数据保存在内存中的想法; 它可以让你了解有关邮件大小的一系列攻击(有意或无意)。 您可能还想要Google“循环缓冲区” 。

实际上,您应该在收到数据后立即编写对数据有意义的代码:’Push Parsing’(例如, SAX支持这就是)。 作为如何使用文本执行此操作的示例:

 private Encoding _encoding; private Decoder _decoder; private char[] _charData = new char[4]; public PushTextReader(Encoding encoding) { _encoding = encoding; _decoder = _encoding.GetDecoder(); } // A single connection requires its own decoder // and charData. That connection should never // call this method from multiple threads // simultaneously. // If you are using the ReadAsyncLoop you // don't need to worry about it. public void ReceiveData(ArraySegment data) { // The two false parameters cause the decoder // to accept 'partial' characters. var charCount = _decoder.GetCharCount(data.Array, data.Offset, data.Count, false); charCount = _decoder.GetChars(data.Array, data.Offset, data.Count, _charData, 0, false); OnCharacterData(new ArraySegment(_charData, 0, charCount)); } 

如果在反序列化之前必须能够接受完整的消息,则可以使用MemoryMappedFile ,其优点是发送实体将无法使服务器内存不足。 棘手的是将文件重置为零; 因为那可能是一堆问题。 解决这个问题的一种方法是:

TCP接收器结束

  1. 写入当前流。
  2. 如果流超过一定长度则移动到新的。

反序列化结束

  1. 从当前流中读取。
  2. 一旦你清空了流就摧毁它。

TCP接收器端非常简单。 解串器端需要一些基本的缓冲区拼接逻辑(记得使用Buffer.BlockCopy而不是Array.Copy )。

旁注:听起来像一个有趣的项目,如果我有时间并记得我可以继续实施这个系统。

这里只有三个答案提供代码。 其中一个是笨拙的,其他人没有回答这个问题。

这是一个可以复制和粘贴的类:

 ///  /// This class is a very fast and threadsafe FIFO buffer ///  public class FastFifo { private List mi_FifoData = new List(); ///  /// Get the count of bytes in the Fifo buffer ///  public int Count { get { lock (mi_FifoData) { return mi_FifoData.Count; } } } ///  /// Clears the Fifo buffer ///  public void Clear() { lock (mi_FifoData) { mi_FifoData.Clear(); } } ///  /// Append data to the end of the fifo ///  public void Push(Byte[] u8_Data) { lock (mi_FifoData) { // Internally the .NET framework uses Array.Copy() which is extremely fast mi_FifoData.AddRange(u8_Data); } } ///  /// Get data from the beginning of the fifo. /// returns null if s32_Count bytes are not yet available. ///  public Byte[] Pop(int s32_Count) { lock (mi_FifoData) { if (mi_FifoData.Count < s32_Count) return null; // Internally the .NET framework uses Array.Copy() which is extremely fast Byte[] u8_PopData = new Byte[s32_Count]; mi_FifoData.CopyTo(0, u8_PopData, 0, s32_Count); mi_FifoData.RemoveRange(0, s32_Count); return u8_PopData; } } ///  /// Gets a byte without removing it from the Fifo buffer /// returns -1 if the index is invalid ///  public int PeekAt(int s32_Index) { lock (mi_FifoData) { if (s32_Index < 0 || s32_Index >= mi_FifoData.Count) return -1; return mi_FifoData[s32_Index]; } } }