在C#中缓冲字节数据
我的应用程序从TCP套接字读取字节并需要缓冲它们,以便我可以稍后从中提取消息。 由于TCP的性质,我可能在一次读取中得到部分或多个消息,因此在每次读取之后,我想检查缓冲区并提取尽可能多的完整消息。
因此,我想要一个允许我执行以下操作的课程:
- 将任意byte []数据附加到它
- 在不消费内容的情况下检查内容,特别是检查内容量并搜索某个字节或字节的存在
- 以byte []的forms提取和使用部分数据,而将其余部分留在那里以备将来读取
我希望我想要的东西可以用.NET库中的一个或多个现有类完成,但我不确定哪些类。 System.IO.MemoryStream看起来接近我想要的,但是(a)不清楚它是否适合用作缓冲区(读取数据是否从容量中删除?)和(b)读写似乎发生在同一个地方 – “流的当前位置是下一次读取或写入操作可能发生的位置。” – 这不是我想要的。 我需要写到最后并从前面阅读。
只需使用一个大字节数组和Array.Copy – 它应该可以解决问题。 如果没有,请使用List
。
如果你使用数组,你必须自己实现一个索引(你在那里复制其他数据)(同样用于检查内容大小),但它很简单。
如果您感兴趣:这是一个“循环缓冲区”的简单实现。 测试应该运行(我在它上面进行了几次unit testing,但没有检查所有关键路径):
public class ReadWriteBuffer { private readonly byte[] _buffer; private int _startIndex, _endIndex; public ReadWriteBuffer(int capacity) { _buffer = new byte[capacity]; } public int Count { get { if (_endIndex > _startIndex) return _endIndex - _startIndex; if (_endIndex < _startIndex) return (_buffer.Length - _startIndex) + _endIndex; return 0; } } public void Write(byte[] data) { if (Count + data.Length > _buffer.Length) throw new Exception("buffer overflow"); if (_endIndex + data.Length >= _buffer.Length) { var endLen = _buffer.Length - _endIndex; var remainingLen = data.Length - endLen; Array.Copy(data, 0, _buffer, _endIndex, endLen); Array.Copy(data, endLen, _buffer, 0, remainingLen); _endIndex = remainingLen; } else { Array.Copy(data, 0, _buffer, _endIndex, data.Length); _endIndex += data.Length; } } public byte[] Read(int len, bool keepData = false) { if (len > Count) throw new Exception("not enough data in buffer"); var result = new byte[len]; if (_startIndex + len < _buffer.Length) { Array.Copy(_buffer, _startIndex, result, 0, len); if (!keepData) _startIndex += len; return result; } else { var endLen = _buffer.Length - _startIndex; var remainingLen = len - endLen; Array.Copy(_buffer, _startIndex, result, 0, endLen); Array.Copy(_buffer, 0, result, endLen, remainingLen); if (!keepData) _startIndex = remainingLen; return result; } } public byte this[int index] { get { if (index >= Count) throw new ArgumentOutOfRangeException(); return _buffer[(_startIndex + index) % _buffer.Length]; } } public IEnumerable Bytes { get { for (var i = 0; i < Count; i++) yield return _buffer[(_startIndex + i) % _buffer.Length]; } } }
请注意:代码“消耗”读取 - 如果您不希望只删除“_startIndex = ...”部分(或进行重载可选参数和检查或其他)。
我建议您使用MemoryStream
,但将其封装在另一个存储的类中:
-
MemoryStream
- 目前的“阅读”位置
- 目前“消费”的位置
然后它会暴露:
- 写入:将流的位置设置为结尾,写入数据,将流的位置设置回读取位置
- 读取:读取数据,将读取位置设置为流的位置
- 消耗:更新消耗的位置(根据您尝试消费的方式详细说明); 如果消耗位置高于某个阈值,则将现有的缓冲数据复制到新的
MemoryStream
并更新所有变量。 (您可能不希望在每个使用请求上复制缓冲区。)
请注意,如果没有额外的同步,这些都不是线程安全的。
这是我刚才写的缓冲区的另一个实现 :
- 可resize :允许排队数据而不抛出缓冲区溢出exception;
- 高效 :使用单个缓冲区和Buffer.Copy操作来排队/出列数据
来到这个晚了,但后人:
当我过去这样做时,我采取了一种略微不同的方法。 如果您的消息具有固定的标头大小(告诉您正文中有多少字节),并且记住网络流已经缓冲,我将分两个阶段执行操作:
- 在流上读取标头的字节
- 根据标头对流体的字节进行后续读取
- 重复
这可以利用这样一个事实 – 对于一个流 – 当你要求’n’个字节时,你永远不会得到更多回报,所以你可以忽略许多’我读过太多的opps,让我把它们放到下一次’问题。
现在这不是整个故事,公平。 我在流上有一个底层包装类来处理碎片问题(即如果要求4个字节,则在收到4个字节或流关闭之前不返回)。 但这一点相当容易。
在我看来,关键是将消息处理与流机制分离,如果您停止尝试将消息作为流中的单个ReadBytes()使用,生活变得更加简单。
[无论您的读取是阻塞还是异步(APM /等待),所有这一切都是正确的]
我认为BufferedStream
是解决问题的方法。 也可以通过调用Seek来读取未读的len
字节数据。
BufferdStream buffer = new BufferedStream(tcpStream, size); // we have a buffer of size ... ... while(...) { buffer.Read(...); // do my staff // I read too much, I want to put back len bytes buffer.Seek(-len, SeekOrigin.End); // I shall come back and read later }
成长记忆
与最初指定size
BufferedStream
相反, MemoryStream
可以增长。
记住流数据
MemoryStream
始终保存所有dara-read,而BufferedStream
仅保存一段流数据。
源流与字节数组
MemoryStream
允许在Write()
方法中添加输入字节,将来可以是Read()
。 而BufferedSteam
从构造函数中指定的另一个源流中获取输入字节。
听起来你想从套接字读入MemoryStream缓冲区,然后从缓冲区“弹出”数据并在每次遇到某个字节时重置它。 它看起来像这样:
void ReceiveAllMessages(Action messageReceived, Socket socket) { var currentMessage = new MemoryStream(); var buffer = new byte[128]; while (true) { var read = socket.Receive(buffer, 0, buffer.Length); if (read == 0) break; // Connection closed for (var i = 0; i < read; i++) { var currentByte = buffer[i]; if (currentByte == END_OF_MESSAGE) { var message = currentMessage.ToByteArray(); messageReceived(message); currentMessage = new MemoryStream(); } else { currentMessage.Write(currentByte); } } } }
您可以使用Stream
包装ConcurrentQueue
来执行此ConcurrentQueue
(请记住,这仅使其向前)。 但是我真的不喜欢在使用它之前将数据保存在内存中的想法; 它可以让你了解有关邮件大小的一系列攻击(有意或无意)。 您可能还想要Google“循环缓冲区” 。
实际上,您应该在收到数据后立即编写对数据有意义的代码:’Push Parsing’(例如, SAX支持这就是)。 作为如何使用文本执行此操作的示例:
private Encoding _encoding; private Decoder _decoder; private char[] _charData = new char[4]; public PushTextReader(Encoding encoding) { _encoding = encoding; _decoder = _encoding.GetDecoder(); } // A single connection requires its own decoder // and charData. That connection should never // call this method from multiple threads // simultaneously. // If you are using the ReadAsyncLoop you // don't need to worry about it. public void ReceiveData(ArraySegment data) { // The two false parameters cause the decoder // to accept 'partial' characters. var charCount = _decoder.GetCharCount(data.Array, data.Offset, data.Count, false); charCount = _decoder.GetChars(data.Array, data.Offset, data.Count, _charData, 0, false); OnCharacterData(new ArraySegment(_charData, 0, charCount)); }
如果在反序列化之前必须能够接受完整的消息,则可以使用MemoryMappedFile ,其优点是发送实体将无法使服务器内存不足。 棘手的是将文件重置为零; 因为那可能是一堆问题。 解决这个问题的一种方法是:
TCP接收器结束
- 写入当前流。
- 如果流超过一定长度则移动到新的。
反序列化结束
- 从当前流中读取。
- 一旦你清空了流就摧毁它。
TCP接收器端非常简单。 解串器端需要一些基本的缓冲区拼接逻辑(记得使用Buffer.BlockCopy
而不是Array.Copy
)。
旁注:听起来像一个有趣的项目,如果我有时间并记得我可以继续实施这个系统。
这里只有三个答案提供代码。 其中一个是笨拙的,其他人没有回答这个问题。
这是一个可以复制和粘贴的类:
/// /// This class is a very fast and threadsafe FIFO buffer /// public class FastFifo { private List mi_FifoData = new List (); /// /// Get the count of bytes in the Fifo buffer /// public int Count { get { lock (mi_FifoData) { return mi_FifoData.Count; } } } /// /// Clears the Fifo buffer /// public void Clear() { lock (mi_FifoData) { mi_FifoData.Clear(); } } /// /// Append data to the end of the fifo /// public void Push(Byte[] u8_Data) { lock (mi_FifoData) { // Internally the .NET framework uses Array.Copy() which is extremely fast mi_FifoData.AddRange(u8_Data); } } /// /// Get data from the beginning of the fifo. /// returns null if s32_Count bytes are not yet available. /// public Byte[] Pop(int s32_Count) { lock (mi_FifoData) { if (mi_FifoData.Count < s32_Count) return null; // Internally the .NET framework uses Array.Copy() which is extremely fast Byte[] u8_PopData = new Byte[s32_Count]; mi_FifoData.CopyTo(0, u8_PopData, 0, s32_Count); mi_FifoData.RemoveRange(0, s32_Count); return u8_PopData; } } /// /// Gets a byte without removing it from the Fifo buffer /// returns -1 if the index is invalid /// public int PeekAt(int s32_Index) { lock (mi_FifoData) { if (s32_Index < 0 || s32_Index >= mi_FifoData.Count) return -1; return mi_FifoData[s32_Index]; } } }