如何将Observable序列化到云端并返回

我需要将处理序列(如本问题中如何使用.net RX组织数据处理器序列 )拆分为Azure环境中的多个计算单元。
我们的想法是将Observable序列序列化为Azure队列(或Service Bus)并将其反序列化。
如果生产者或消费者失败,其他方应该能够继续生产/消费。

任何人都可以建议一个优雅的方式来做这个和使用什么(Azure队列或服务总线)?
有没有人使用TCP Observable提供程序 – http://rxx.codeplex.com/wikipage?title=TCP%20Qbservable%20Provider这样的问题是否对一方的失败安全?

想象一下,你有一个使用以下API的消息队列

class MQ { public MQ(); // send a single message from your message queue public void send(string keyPath, string msg); // Receive a single message from your message queue public async Task receive(keyPath); } 

使这个RX兼容

 class MQRX: IObserver { MQ _mq; string _keyPath MQRX(string keyPath){ _mq = mq; _keyPath = keyPath; } IObservable Observe(){ return Observable.Defer(()=> mq.receive(keyPath).ToObservable() ).Repeat(); } void OnNext(string msg){ _mq.send(msg); } void OnError(Exception e){ // The message queue might not // support serializing exceptions // or it might or you might build // a protocol for it. } } 

以容错方式使用它。 注意如果OnError传递的上游抛出exception,则重试将重新订阅

 new MQRX("users/1/article/2"). Retry(). Subscribe((msg)=>Console.Writeln(msg)); 

例如,在写入方面,您可以每两秒发送一条消息,如果出现错误,则重试订阅生成器。 请注意,Observable.Interval中不太可能出现错误,只是每隔一段时间生成一条消息,但想象一下从文件或其他消息队列中读取消息。

 var mq = new MQRX("users/1/article/2"); Observable.Interval(TimeSpan.FromSeconds(2)). Select((x)=>x.ToString()). 

请注意,您可能应该使用IObservable Catch扩展方法而不是盲目地重试,因为您可能会一遍又一遍地得到相同的错误。 重试()。 订阅(MQ);

我在30行VB代码中编写了自己的UDP到RX包装器,它处理超时。 我认为TCP包装器类似。

 Imports System.Reactive Imports System.Reactive.Linq Imports System.Reactive.Threading.Tasks Imports System.Threading Imports System.Net Imports System.Net.Sockets Public Module UDP '''  ''' Generates an IObservable as a UDP stream on the IP endpoint with an optional ''' timeout value between results. '''  '''  '''  '''  Public Function StreamObserver( localPort As Integer, Optional timeout As Nullable(Of TimeSpan) = Nothing ) As IObservable(Of UdpReceiveResult) Return Linq.Observable.Using(Of UdpReceiveResult, UdpClient)( Function() New UdpClient(localPort), Function(udpClient) Dim o = Observable.Defer(Function() udpClient. ReceiveAsync(). ToObservable()) If Not timeout Is Nothing Then Return Observable.Timeout(o.Repeat(), timeout.Value) Else Return o.Repeat() End If End Function ) End Function End Module 

如果需要,您可以为写入端执行相同操作。 然后,您只需使用常规RX技术将数据序列化为UDP帧。

 new UDP.StreamObserver(8080, TimeSpan.FromSeconds(2)). Select( function(udpResult) MyDeserializeFunction(udpResult)). Subscribe( sub (result) DoSomething(result), sub(error) DoSomethingWithError )