使用protobuf-net进行惰性,流驱动的对象序列化

我们正在开发用于流式传输大量数据的WCF服务,因此我们选择使用WCF流function与protobuf-net序列化相结合。

语境:

通常,一个想法是序列化服务中的对象,将它们写入流并发送。 在另一端,调用者将接收Stream对象,它可以读取所有数据。

所以目前服务方法代码看起来有点像这样:

public Result TestMethod(Parameter parameter) { // Create response var responseObject = new BusinessResponse { Value = "some very large data"}; // The resposne have to be serialized in advance to intermediate MemoryStream var stream = new MemoryStream(); serializer.Serialize(stream, responseObject); stream.Position = 0; // ResultBody is a stream, Result is a MessageContract return new Result {ResultBody = stream}; } 

BusinessResponse对象被序列化为MemoryStream,并从方法返回。 在客户端,调用代码如下所示:

 var parameter = new Parameter(); // Call the service method var methodResult = channel.TestMethod(parameter); // protobuf-net deserializer reads from a stream received from a service. // while reading is performed by protobuf-net, // on the service side WCF is actually reading from a // memory stream where serialized message is stored var result = serializer.Deserialize(methodResult.ResultBody); return result; 

因此,当调用serializer.Deserialize()时,它从流methodResult.ResultBody读取,同时在服务端WCF正在读取已从TestMethod返回的MemoryStream。

问题:

我们想要实现的是立即摆脱MemoryStream和服务端整个对象的初始序列化。 由于我们使用流式传输,因此我们希望避免在发送之前将序列化对象保留在内存中。

理念:

完美的解决方案是返回一个空的,自定义的Stream对象(来自TestMethod() ),并引用一个要序列化的对象(在我的示例中为’BusinessResponse’对象)。 因此,当WCF调用流的Read()方法时,我使用protobuf-net在内部序列化一个对象,并将其返回给调用者,而不将其存储在内存中。

现在有一个问题,因为我们实际需要的是在读取流时可以逐个序列化对象的可能性。 我知道这是完全不同的序列化方式 – 而不是将对象推送到序列化器,我想逐件请求序列化内容。

是否可以使用protobuf-net进行某种序列化?

我编写了一些可能与Marc门概念相符的代码。

 public class PullStream : Stream { private byte[] internalBuffer; private bool ended; private static ManualResetEvent dataAvailable = new ManualResetEvent(false); private static ManualResetEvent dataEmpty = new ManualResetEvent(true); public override bool CanRead { get { return true; } } public override bool CanSeek { get { return false; } } public override bool CanWrite { get { return true; } } public override void Flush() { throw new NotImplementedException(); } public override long Length { get { throw new NotImplementedException(); } } public override long Position { get { throw new NotImplementedException(); } set { throw new NotImplementedException(); } } public override int Read(byte[] buffer, int offset, int count) { dataAvailable.WaitOne(); if ( count >= internalBuffer.Length) { var retVal = internalBuffer.Length; Array.Copy(internalBuffer, buffer, retVal); internalBuffer = null; dataAvailable.Reset(); dataEmpty.Set(); return retVal; } else { Array.Copy(internalBuffer, buffer, count); internalBuffer = internalBuffer.Skip(count).ToArray(); // i know return count; } } public override long Seek(long offset, SeekOrigin origin) { throw new NotImplementedException(); } public override void SetLength(long value) { throw new NotImplementedException(); } public override void Write(byte[] buffer, int offset, int count) { dataEmpty.WaitOne(); dataEmpty.Reset(); internalBuffer = new byte[count]; Array.Copy(buffer, internalBuffer, count); Debug.WriteLine("Writing some data"); dataAvailable.Set(); } public void End() { dataEmpty.WaitOne(); dataEmpty.Reset(); internalBuffer = new byte[0]; Debug.WriteLine("Ending writes"); dataAvailable.Set(); } } 

这是一个简单的流后代类,只实现Read和Write(和End)。 没有数据时读取块,数据可用时写入块。 这样只涉及一个字节缓冲区。 其余的linq复制打开以进行优化;-)添加End方法,因此当没有数据可用且不再写入数据时,不会发生阻塞。

您必须从单独的线程写入此流。 我在下面显示:

  // create a large object var obj = new List(); for(int i = 0; i <= 1000; i ++) obj.Add(new ToSerialize { Test = "This is my very loooong message" }); // create my special stream to read from var ms = new PullStream(); new Thread(x => { ProtoBuf.Serializer.Serialize(ms, obj); ms.End(); }).Start(); var buffer = new byte[100]; // stream to write back to (just to show deserialization is working too) var ws = new MemoryStream(); int read; while ((read = ms.Read(buffer, 0, 100)) != 0) { ws.Write(buffer, 0, read); Debug.WriteLine("read some data"); } ws.Position = 0; var back = ProtoBuf.Serializer.Deserialize>(ws); 

我希望这能解决你的问题:-)无论如何编码这个很有趣。

此致,Jacco