处理基于线路的网络I / O流的好方法是什么?

注意:让我为这个问题的长度道歉,我不得不把很多信息都写进去。 我希望这不会导致太多人简单地浏览它并做出假设。 请完整阅读。 谢谢。

我有一个通过套接字进入的数据流。 这些数据是面向行的。

我正在使用.NET的APM(异步编程方法)(BeginRead等)。 这排除了使用基于流的I / O,因为异步I / O是基于缓冲区的。 可以重新打包数据并将其发送到流,例如内存流,但也存在问题。

问题是我的输入流(我无法控制)不会向我提供有关流的长度的任何信息。 它只是一个看起来像这样的换行符流:

COMMAND\n ...Unpredictable number of lines of data...\n END COMMAND\n ....repeat.... 

因此,使用APM,并且因为我不知道任何给定数据集将持续多长时间,很可能数据块将跨越需要多次读取的缓冲区边界,但这些多次读取也将跨越多个数据块。

例:

 Byte buffer[1024] = ".................blah\nThis is another l" [another read] "ine\n.............................More Lines..." 

我的第一个想法是使用StringBuilder并简单地将缓冲线附加到SB。 这在某种程度上起作用,但我发现很难提取数据块。 我尝试使用StringReader来读取新的数据但是无法知道你是否得到一个完整的行,因为StringReader在最后一个块的末尾返回一个部分行,然后返回null。 没有办法知道返回的内容是否是完整的新行数据。

例:

 // Note: no newline at the end StringBuilder sb = new StringBuilder("This is a line\nThis is incomp.."); StringReader sr = new StringReader(sb); string s = sr.ReadLine(); // returns "This is a line" s = sr.ReadLine(); // returns "This is incomp.." 

更糟糕的是,如果我继续追加数据,缓冲区会变得越来越大,因为这可能会持续几周或几个月,这不是一个好的解决方案。

我的下一个想法是在读取它时从SB中删除数据块。 这需要编写我自己的ReadLine函数,但后来我在读写期间卡住了数据。 此外,较大的数据块(可包含数百个读取和数兆字节的数据)需要扫描整个缓冲区以查找换行符。 它效率不高,而且非常丑陋。

我正在寻找具有StreamReader / Writer简单性的东西,同时具有异步I / O的便利性。

我的下一个想法是使用MemoryStream,并将数据块写入内存流,然后将StreamReader附加到流并使用ReadLine,但我再次知道缓冲区中的最后一次读取是完整的行还是不,再加上从流中删除“陈旧”数据就更难了。

我还考虑过使用带有同步读取的线程。 这样做的好处是,使用StreamReader时,除了断开的连接情况外,它总是从ReadLine()返回一个完整的行。 然而,这在取消连接时存在问题,并且某些类型的网络问题可能导致长时间挂起的阻塞套接字。 我正在使用异步IO,因为我不想在程序阻塞数据接收的生命周期中占用一个线程。

这种联系持久。 随着时间的推移,数据将继续流动。 在初始连接期间,存在大量数据,并且一旦完成该流程,套接字保持打开以等待实时更新。 我不确切知道初始流程何时“完成”,因为唯一的方法就是不再立即发送数据。 这意味着我不能等待在处理之前完成初始数据加载,我几乎在实时处理“实时”处理。

那么,任何人都可以提出一个很好的方法来处理这种情况,而不是过于复杂吗? 我真的希望它尽可能简单和优雅,但由于所有边缘情况,我不断提出越来越复杂的解决方案。 我想我想要的是某种FIFO,在其中我可以轻松地追加更多数据,同时从中弹出符合特定条件的数据(即换行终止字符串)。

这是一个非常有趣的问题。 过去,我的解决方案是使用具有同步操作的单独线程,如您所建议的那样。 (我设法解决了使用锁和许多exception处理程序阻塞套接字的大多数问题。)尽管如此,使用内置异步操作通常是可取的,因为它允许真正的操作系统级异步I / O,所以我理解你的观点。

好吧,我已经去写了一堂课来完成我认为你需要的东西(我会以相对干净的方式说)。 让我知道你的想法。

 using System; using System.Collections.Generic; using System.IO; using System.Text; public class AsyncStreamProcessor : IDisposable { protected StringBuilder _buffer; // Buffer for unprocessed data. private bool _isDisposed = false; // True if object has been disposed public AsyncStreamProcessor() { _buffer = null; } public IEnumerable Process(byte[] newData) { // Note: replace the following encoding method with whatever you are reading. // The trick here is to add an extra line break to the new data so that the algorithm recognises // a single line break at the end of the new data. using(var newDataReader = new StringReader(Encoding.ASCII.GetString(newData) + Environment.NewLine)) { // Read all lines from new data, returning all but the last. // The last line is guaranteed to be incomplete (or possibly complete except for the line break, // which will be processed with the next packet of data). string line, prevLine = null; while ((line = newDataReader.ReadLine()) != null) { if (prevLine != null) { yield return (_buffer == null ? string.Empty : _buffer.ToString()) + prevLine; _buffer = null; } prevLine = line; } // Store last incomplete line in buffer. if (_buffer == null) // Note: the (* 2) gives you the prediction of the length of the incomplete line, // so that the buffer does not have to be expanded in most/all situations. // Change it to whatever seems appropiate. _buffer = new StringBuilder(prevLine, prevLine.Length * 2); else _buffer.Append(prevLine); } } public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } private void Dispose(bool disposing) { if (!_isDisposed) { if (disposing) { // Dispose managed resources. _buffer = null; GC.Collect(); } // Dispose native resources. // Remember that object has been disposed. _isDisposed = true; } } } 

应该为每个NetworkStream创建此类的实例,并且每当接收到新数据时都应该调用Process函数(在BeginRead的回调方法中,在调用我想象的下一个BeginRead之前)。

注意:我只用测试数据validation了这段代码,而不是通过网络传输的实际数据。 但是,我不希望有任何差异……

此外,警告该类当然不是线程安全的,但只要在处理完当前数据之后不再执行BeginRead(正如我认为你正在做的那样),就不应该有任何问题。

希望这对你有用。 如果有剩余问题,请告诉我,我会尝试修改解决方案来处理它们。 (尽管仔细阅读,但我错过的问题可能会有些微妙!)

你在解释的问题是什么,让我想起ASCIZ字符串。 ( 链接文字 )。 这可能是一个有益的开始。

我不得不在大学里为我正在研究的项目写一些类似的东西。 不幸的是,我控制了发送套接字,所以我插入了一段消息字段作为协议的一部分。 但是,我认为类似的方法可能会让您受益。

我如何处理我的解决方案是我会发送类似5HELLO的东西,所以首先我会看到5,并且知道我的消息长度为5,因此我需要的消息是5个字符。 但是,如果我的异步读取,我只有5HE,我会看到我的消息长度为5,但我只能读取3个字节的线路(让我们假设ASCII字符)。 因此,我知道我丢失了一些字节,并将其存储在片段缓冲区中。 我每个插槽有一个片段缓冲区,因此避免了任何同步问题。 粗糙的过程是。

  1. 从套接字读入字节数组,记录读取的字节数
  2. 逐字节扫描,直到找到一个换行符(如果你没有收到ascii字符,这会变得非常复杂,但是这个字符可能是多个字节,你就是你自己的那个)
  3. 把你的碎片缓冲区变成一个字符串,然后将你的缓冲区追加到它的新行。 将此字符串作为已完成的消息拖放到队列或其自己的委托上进行处理。 (你可以通过实际让你读取套接字写入同一个字节数组来优化这些缓冲区,但是你很难解释)
  4. 继续循环,每次我们找到一个新行时,从记录的开始/结束位置的字节排列创建一个字符串,然后放入队列/委托进行处理。
  5. 一旦我们到达读缓冲区的末尾,将剩下的任何内容复制到frag缓冲区中。
  6. 调用套接字上的BeginRead,当套接字中有数据时,它将跳转到第1步。

然后你使用另一个Thread来读取你的incommign消息队列,或者让Threadpool使用delegates来处理它。 并做任何你必须做的数据处理。 如果我错了,有人会纠正我,但是这个线程同步问题很少,因为你只能在任何时候阅读或等待从套接字读取,所以不用担心锁(除非你是填充队列,我在我的实现中使用了委托)。 有一些细节需要你自己解决,比如要留下多大的碎片缓冲区,如果你在读取时收到0个换行符,整个消息必须附加到片段缓冲区而不会覆盖任何东西。 我认为它最终给我带来了大约700到800行代码,但其中包括连接设置,加密协商以及其他一些事情。

这个设置对我来说非常好; 我使用这种实现1.8Ghz opteron(包括加密处理)在100Mbps以太网LAN上执行高达80Mbps的速率。 由于您与套接字绑定,服务器将扩展,因为可以同时处理多个套接字。 如果您需要按顺序处理项目,则需要使用队列,但如果顺序无关紧要,那么委托将为您提供线程池之外的可扩展性能。

希望这有助于,不是一个完整的解决方案,而是一个开始寻找的方向。

*只是一个注释,我的实现完全是在字节级别和支持加密,我使用字符为我的例子,使其更容易可视化。