使用TcpClient和Reactive Extensions从Stream读取连续字节流

请考虑以下代码:

internal class Program { private static void Main(string[] args) { var client = new TcpClient(); client.ConnectAsync("localhost", 7105).Wait(); var stream = client.GetStream(); var observable = stream.ReadDataObservable().Repeat(); var s = from d in observable.Buffer(4) let headerLength = IPAddress.NetworkToHostOrder(BitConverter.ToInt16(d.ToArray(), 2)) let b = observable.Take(headerLength) select b.ToEnumerable().ToArray(); s.Subscribe(a => Console.WriteLine("{0}", a)); Console.ReadLine(); } } public static class Extensions { public static IObservable ReadDataObservable(this Stream stream) { return Observable.Defer(async () => { var buffer = new byte[1024]; var readBytes = await stream.ReadAsync(buffer, 0, buffer.Length); return buffer.Take(readBytes).ToObservable(); }); } } 

基本上我想解析我通过Reactive Extensions收到的消息。 使用Buffer(4)正确解析消息的头部,并获得消息剩余部分的长度。 出现的问题是,当我执行stream.Take(headerLength)时,代码重新评估整个“链”并尝试从流中获取新消息,而不是返回已从流中读取的其余字节。 更确切地说,第一个ReadAsync(…)返回38个字节,Buffer(4)返回前4个,observable.Take(headerLength)不返回剩余的34个字节,而是尝试读取新的ReadAsync消息。

问题是,如何确保observable.Take(headerLength)接收已读取的34个字节而不尝试从流中读取新消息? 我一直在寻找解决方案,但我无法弄清楚如何实现这一目标。

编辑:这个解决方案( 使用Reactive Extensions(Rx)实现套接字编程? )不是我想要的。 这不是读取流中的所有可用内容(最多为buffersize),而是从中获取连续的字节流。 对我来说,这个解决方案似乎不是从流中读取的一种非常有效的方式,因此我的问题。

这种方法不起作用。 问题是你使用observable的方式。 Buffer不会读取4个字节并退出,它将连续读取4个字节的块。 Take构成了第二个订阅,它将读取重叠的字节。 您会发现将流直接解析为消息要容易得多。

以下代码也做了很多努力来正确清理。

假设您的Message就是这样,(为了测试添加了ToString ):

 public class Message { public byte[] PayLoad; public override string ToString() { return Encoding.UTF8.GetString(PayLoad); } } 

你已经获得了一个Stream然后你可以解析如下。 首先,从流中读取确切字节数的方法:

 public async static Task ReadExactBytesAsync( Stream stream, byte[] buffer, CancellationToken ct) { var count = buffer.Length; var totalBytesRemaining = count; var totalBytesRead = 0; while (totalBytesRemaining != 0) { var bytesRead = await stream.ReadAsync( buffer, totalBytesRead, totalBytesRemaining, ct); ct.ThrowIfCancellationRequested(); totalBytesRead += bytesRead; totalBytesRemaining -= bytesRead; } } 

然后将流转换为IObservable

 public static IObservable ReadMessages( Stream sourceStream, IScheduler scheduler = null) { int subscribed = 0; scheduler = scheduler ?? Scheduler.Default; return Observable.Create(o => { // first check there is only one subscriber // (multiple stream readers would cause havoc) int previous = Interlocked.CompareExchange(ref subscribed, 1, 0); if (previous != 0) o.OnError(new Exception( "Only one subscriber is allowed for each stream.")); // we will return a disposable that cleans // up both the scheduled task below and // the source stream var dispose = new CompositeDisposable { Disposable.Create(sourceStream.Dispose) }; // use async scheduling to get nice imperative code var schedule = scheduler.ScheduleAsync(async (ctrl, ct) => { // store the header here each time var header = new byte[4]; // loop until cancellation requested while (!ct.IsCancellationRequested) { try { // read the exact number of bytes for a header await ReadExactBytesAsync(sourceStream, header, ct); } catch (OperationCanceledException) { throw; } catch (Exception ex) { // pass through any problem in the stream and quit o.OnError(new InvalidDataException("Error in stream.", ex)); return; } ct.ThrowIfCancellationRequested(); var bodyLength = IPAddress.NetworkToHostOrder( BitConverter.ToInt16(header, 2)); // create buffer to read the message var payload = new byte[bodyLength]; // read exact bytes as before try { await ReadExactBytesAsync(sourceStream, payload, ct); } catch (OperationCanceledException) { throw; } catch (Exception ex) { o.OnError(new InvalidDataException("Error in stream.", ex)); return; } // create a new message and send it to client var message = new Message { PayLoad = payload }; o.OnNext(message); } // wrap things up ct.ThrowIfCancellationRequested(); o.OnCompleted(); }); // return the suscription handle dispose.Add(schedule); return dispose; }); } 

编辑 – 我使用的非常hacky测试代码:

 private static void Main(string[] args) { var listener = new TcpListener(IPAddress.Any, 12873); listener.Start(); var listenTask = listener.AcceptTcpClientAsync(); listenTask.ContinueWith((Task t) => { var client = t.Result; var stream = client.GetStream(); const string messageText = "Hello World!"; var body = Encoding.UTF8.GetBytes(messageText); var header = BitConverter.GetBytes( IPAddress.HostToNetworkOrder(body.Length)); for (int i = 0; i < 5; i++) { stream.Write(header, 0, 4); stream.Write(body, 0, 4); stream.Flush(); // deliberate nasty delay Thread.Sleep(2000); stream.Write(body, 4, body.Length - 4); stream.Flush(); } stream.Close(); listener.Stop(); }); var tcpClient = new TcpClient(); tcpClient.Connect(new IPEndPoint(IPAddress.Loopback, 12873)); var clientStream = tcpClient.GetStream(); ReadMessages(clientStream).Subscribe( Console.WriteLine, ex => Console.WriteLine("Error: " + ex.Message), () => Console.WriteLine("Done!")); Console.ReadLine(); } 

包起来

如果服务器死了,您需要考虑设置读取超时,并且服务器应该发送某种“结束消息”。 目前,此方法将不断尝试接收字节。 正如你没有指出的那样,我没有包括这样的东西 – 但是如果你这样做的话,那么当我写完它时,只要打开while循环就会导致OnCompleted被发送。

我想这里需要的是Qactive :一个基于Rx.Net的可查询反应式tcp服务器提供程序

服务器

 Observable .Interval(TimeSpan.FromSeconds(1)) .ServeQbservableTcp(new IPEndPoint(IPAddress.Loopback, 3205)); 

客户

 var datasourceAddress = new IPEndPoint(IPAddress.Loopback, 3205); var datasource = new TcpQbservableClient(datasourceAddress); ( from value in datasource.Query() //The code below is actually executed on the server where value <= 5 || value >= 8 select value ) .Subscribe(Console.WriteLine); 

令人头疼的是,客户可以说出他们想要收到的数据的频率和频率,服务器仍然可以限制和控制返回的时间,频率和数据量。

有关此https://github.com/RxDave/Qactive的更多信息

另一个blog.sample

https://sachabarbs.wordpress.com/2016/12/23/rx-over-the-wire/