使用自定义流(IEnumerable )

我正在使用Stream的自定义实现,它将IEnumerable流式传输到流中。 我正在使用此EnumerableStream实现来执行转换。

我正在使用它在流模式下通过WCF执行流式传输。 我能够将IEnumerable转换为流而没有问题。 有一次,我在客户端,我可以反序列化并获取所有数据,但是我无法找到停止在我的流上循环的条件。 我越来越:

System.Runtime.Serialization.SerializationException:解析完成之前遇到的Stream of Stream。

这是我想要实现的示例:

 class Program { public static void Main() { var ListToSend = new List<List>(); var ListToReceive = new List<List>(); ListToSend = SimulateData().ToList(); using (Stream stream = GetStream(ListToSend)) { var formatter = new BinaryFormatter(); while (stream.CanRead || 1 == 1 || true...) // What should I put in here to stop once I read everything??? { List row = formatter.Deserialize(stream) as List; ListToReceive.Add(row); } Printer(ListToReceive); Console.WriteLine("Done"); } } private static void Printer(List<List> data) { Console.WriteLine("Printing"); foreach (var row in data) { foreach (var cell in row) { Console.Write(cell + "\t"); } Console.WriteLine("-------------------------------------------------------------------------------"); } } private static Stream GetStream(IEnumerable<List> data) { return EnumerableStream.Create(data, DeserializerCallback); } private static List DeserializerCallback(object obj) { var binFormatter = new BinaryFormatter(); var mStream = new MemoryStream(); binFormatter.Serialize(mStream, obj); return mStream.ToArray().ToList(); } private static IEnumerable<List> SimulateData() { Random randomizer = new Random(); for (var i = 0; i < 10; i++) { var row = new List(); for (var j = 0; j < 1000; j++) { row.Add((randomizer.Next(100)).ToString()); } yield return row; } } } 

我没有包含自定义流。 我为那些想要查看整个代码的人创建了一个小提琴 。

  • 我是否需要在自定义流本身中添加一些内容以通知所有数据都已被读取?
  • 是因为反序列化器和序列化器的格式不一样(我不这么认为)。
  • 我也想知道为什么当我在读取函数中设置断点时,缓冲区大小会随机变化。
  • 请继续 ,通过尝试并捕获代码回答问题,这不是我想要的答案。 我想要一个不会崩溃的干净解决方案。 谢谢。

如果有人能够启发我会很棒!

我是否需要在自定义流本身中添加一些内容以通知所有数据都已被读取?

您可以,但这在WCF场景中没有帮助,其中收到的Stream是不同的类。

有两种标准(官方,按设计)方式确定Stream数据的结束:

(1) ReadByte返回-1

返回

无符号字节转换为Int32,如果在流的末尾则为-1。

(2)当count > 0时调用返回count > 0

返回

读入缓冲区的总字节数。 如果许多字节当前不可用,则这可以小于请求的字节数,如果已到达流的末尾,则可以小于零(0)。

不幸的是,它们都消耗当前字节(前进到下一个)并且会破坏解串器。

有哪些可能的解决方案?

首先,实现一些序列化/反序列化格式(协议),它允许您知道是否有更多要反序列化的元素。 例如, List在元素之前存储CountT[]在元素之前存储Length等。由于EnumerableStream事先不知道计数,一个简单的解决方案是在每个元素之前发出一个伪字节:

 private bool SerializeNext() { if (!_source.MoveNext()) return false; buf.Enqueue(1); // <-- foreach (var b in _serializer(_source.Current)) _buf.Enqueue(b); return true; } 

这将允许您使用

 while (stream.ReadByte() != -1) { // ... } 

其次,如果你想保留当前格式,更通用的解决方案是实现一个自定义流,它包装另一个流并使用与标准ReadByte相同的语义实现PeekByte方法,但不消耗当前字节:

 public class SequentialStream : Stream { private Stream source; private bool leaveOpen; private int? nextByte; public SequentialStream(Stream source, bool leaveOpen = false) { if (source == null) throw new ArgumentNullException(nameof(source)); if (!source.CanRead) throw new ArgumentException("Non readable source.", nameof(source)); this.source = source; this.leaveOpen = leaveOpen; } protected override void Dispose(bool disposing) { if (disposing && !leaveOpen) source.Dispose(); base.Dispose(disposing); } public override bool CanRead => true; public override bool CanSeek => false; public override bool CanWrite => false; public override long Length => throw new NotSupportedException(); public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); } public override void Flush() { } public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException(); public override void SetLength(long value) => throw new NotSupportedException(); public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException(); public int PeekByte() { if (nextByte == null) nextByte = source.ReadByte(); return nextByte.Value; } public override int Read(byte[] buffer, int offset, int count) { if (count <= 0) return 0; if (nextByte != null) { if (nextByte.Value < 0) return 0; buffer[offset] = (byte)nextByte.Value; if (count > 1) { int read = source.Read(buffer, offset + 1, count - 1); if (read == 0) nextByte = -1; else nextByte = null; return read + 1; } else { nextByte = null; return 1; } } else { int read = source.Read(buffer, offset, count); if (read == 0) nextByte = -1; return read; } } } 

这基本上实现了只读前向流,具有0或1字节的预读function。

用法如下:

 using (var stream = new SequentialStream(GetStream(ListToSend))) { // ... while (stream.PeekByte() != -1) { // ... } // ... } 

PS怎么样?

我也想知道为什么当我在读取函数中设置断点时,缓冲区大小会随机变化。

这不是随机的。 BinaryFormatter内部使用BinaryReader读取类型值,如Int32ByteString等,传递所需的大小作为count ,例如4,1,字符串编码字节的数量(它知道,因为它在实际数据之前将它们存储在流中)在尝试读取实际数据之前读取它)等。

首先,您可以简单地序列化List>本身。 在这里演示 。 这样就不需要这个专门的类来读取流。 并且可能使这个答案没有实际意义。 一次流式传输一个的唯一目的是潜在的非常大的数据集。 在这种情况下将需要不同的实现,这是以下解决方案可能解决的问题。

以下答案(和您的代码)要求读取流的客户端具有EnumerableStream类。

我是否需要在自定义流本身中添加一些内容以通知所有数据都已被读取?

是。 您需要实现一个新属性以了解是否有另一个T要读取,或使用长度。

public bool HasMore { get { return _buf.Any() || SerializeNext();} }

要么

public override long Length { get { return (_buf.Any() || SerializeNext()) ? 1 : 0; } }

我觉得整个解决方案可以清理成IEnumerable StreamReader 。 但是,这有效。

这是经过调整和工作的小提琴手。 请注意,我也清理了一下。 名为与其他类相同的静态类让我头疼;)。 另外,我会改为byte[] ,而不是List

是因为反序列化器和序列化器的格式不一样(我不这么认为)。

没有。

我也想知道为什么当我在读取函数中设置断点时,缓冲区大小会随机变化。

缓冲区_buf应该是序列化的当前项的大小。 这可能因项目而异。

请继续,通过尝试并捕获代码回答问题,这不是我想要的答案。 我想要一个没有崩溃且不会崩溃的干净解决方案。 谢谢。

明智的做法是不采取只是吞下exception的方法,而是理解如何使其按预期工作。

实现length属性:

 public override long Length { get { return (_buf.Any() || SerializeNext()) ? 1 : 0; } } 

然后检查长度:

  while (stream.Length > 0) { List row = formatter.Deserialize(stream) as List; ListToReceive.Add(row); } 

我已经在你的小提琴中测试了它,它运作良好。

这是@TheSoftwareJedi解决方案的一种非常类似的方法,但它使用了Length属性,在这种情况下,它将返回你在流中知道的元素的长度。 据我所知,这并不反对使用这种财产。