使用Reactive Extensions(Rx)实现套接字编程?

使用Rx编写GetMessages函数的最简洁方法是什么:

 static void Main() { Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); var messages = GetMessages(socket, IPAddress.Loopback, 4000); messages.Subscribe(x => Console.WriteLine(x)); Console.ReadKey(); } static IObservable GetMessages(Socket socket, IPAddress addr, int port) { var whenConnect = Observable.FromAsyncPattern(socket.BeginConnect, socket.EndConnect)(addr, port); // now will receive a stream of messages // each message is prefixed with an 4 bytes/Int32 indicating it's length. // the rest of the message is a string // ????????????? Now What ????????????? } 

一个简单的服务器作为上述示例的驱动程序: http : //gist.github.com/452893#file_program.cs

关于在套接字编程中使用Rx

我一直在调查使用Reactive Extensions来完成我正在做的一些套接字编程工作。 我这样做的动机是它会以某种方式使代码“更简单”。 这是否意味着更少的代码,更少的嵌套在这些线上。

但到目前为止似乎并非如此:

  1. 我没有找到很多使用Rx和套接字的例子
  2. 我发现的示例似乎没有现有的BeginXXXX,EndXXXX代码那么复杂
  3. 尽管Observable具有FromAsyncPattern的扩展方法,但这并不包括SocketEventArgs Async API。

当前的非工作解决方案

这是我到目前为止所拥有的。 这不起作用,它失败了堆栈溢出(呵呵)我还没有想出语义,所以我可以创建一个读取指定字节数的IObservable

  static IObservable GetMessages(Socket socket, IPAddress addr, int port) { var whenConnect = Observable.FromAsyncPattern(socket.BeginConnect, socket.EndConnect)(addr, port); // keep reading until we get the first 4 bytes byte[] buffer = new byte[1024]; var readAsync = Observable.FromAsyncPattern(socket.BeginReceive, socket.EndReceive); IObservable readBytes = null; var temp = from totalRead in Observable.Defer(() => readBytes) where totalRead  x).Sum(); var nowDoSomethingElse = readBytes.SkipUntil(whenConnect); } 

沿着这些方向的东西可以起作用。 这未经过测试,未考虑exception以及部分返回消息的情况。 但除此之外,我相信这是一个正确的方向。

  public static IObservable GetSocketData(this Socket socket, int sizeToRead, Func valueExtractor) { return Observable.CreateWithDisposable(observer => { var readSize = Observable .FromAsyncPattern( socket.BeginReceive, socket.EndReceive); var buffer = new byte[sizeToRead]; return readSize(buffer, 0, sizeToRead, SocketFlags.None) .Subscribe( x => observer.OnNext(valueExtractor(buffer)), observer.OnError, observer.OnCompleted); }); } public static IObservable GetMessageSize(this Socket socket) { return socket.GetSocketData(4, buf => BitConverter.ToInt32(buf, 0)); } public static IObservable GetMessageBody(this Socket socket, int messageSize) { return socket.GetSocketData(messageSize, buf => Encoding.UTF8.GetString(buf, 0, messageSize)); } public static IObservable GetMessage(this Socket socket) { return from size in socket.GetMessageSize() from message in Observable.If(() => size != 0, socket.GetMessageBody(size), Observable.Return(null)) select message; } public static IObservable GetMessagesFromConnected( this Socket socket) { return socket .GetMessage() .Repeat() .TakeWhile(msg => !string.IsNullOrEmpty(msg)); } public static IObservable GetMessages(this Socket socket, IPAddress addr, int port) { return Observable.Defer(() => { var whenConnect = Observable .FromAsyncPattern( socket.BeginConnect, socket.EndConnect); return from _ in whenConnect(addr, port) from msg in socket.GetMessagesFromConnected() .Finally(socket.Close) select msg; }); } 

编辑:为了处理不完整的读取,Observable.While可以在Dave Sexton在RX论坛的同一个线程中提出使用(在GetSockedData内)。

编辑:另外,看看这篇Jeffrey Van Gogh的文章: Asynchronous System.IO.Stream阅读

好的,所以这可能是“作弊”,但我想你可以重新设定我的非Rx答案并用Observable.Create包装它。

我很确定以IDisposable返回套接字是错误的语义,但不确定会是什么。

  static IObservable GetMessages(Socket socket, IPAddress addr, int port) { return Observable.CreateWithDisposable( o => { byte[] buffer = new byte[1024]; Action> readIntoBuffer = (length, callback) => { var totalRead = 0; AsyncCallback receiveCallback = null; AsyncCallback temp = r => { var read = socket.EndReceive(r); if (read == 0) { socket.Close(); o.OnCompleted(); return; } totalRead += read; if (totalRead < length) { socket.BeginReceive(buffer, totalRead, length - totalRead, SocketFlags.None, receiveCallback, null); } else { callback(length); } }; receiveCallback = temp; socket.BeginReceive(buffer, totalRead, length, SocketFlags.None, receiveCallback, null); }; Action sizeRead = null; Action messageRead = x => { var message = Encoding.UTF8.GetString(buffer, 0, x); o.OnNext(message); readIntoBuffer(4, sizeRead); }; Action temp2 = x => { var size = BitConverter.ToInt32(buffer, 0); readIntoBuffer(size, messageRead); }; sizeRead = temp2; AsyncCallback connectCallback = r => { socket.EndConnect(r); readIntoBuffer(4, sizeRead); }; socket.BeginConnect(addr, port, connectCallback, null); return socket; }); }