使用IObservable而不是事件

我最近一直在阅读有关IObservable的内容。 到目前为止,我已经查看了各种SO问题,并观看了他们可以做什么的video。 我正在思考的整个“推动”机制非常棒,但我仍在努力弄清楚究竟是什么。 从我的读数来看,我认为在某种程度上IObservable可以被“监视”,而IObservers则是“观察者”。

所以现在我要尝试在我的应用程序中实现它。 在我开始之前,有一些事情我想坚持下去。 我已经看到IObservable与IEnumerable相反,但是,我无法在我的特定实例中看到任何可以合并到我的应用程序中的地方。

目前,我大量使用事件,以至于我可以看到“管道”开始变得无法管理。 我想,IObservable可以帮助我。

考虑以下设计,这是我的应用程序中的I / O包装器(仅供参考,我通常需要处理字符串):

我有一个名为IDataIO的基本接口:

 public interface IDataIO { event OnDataReceived; event OnTimeout: event OnTransmit; } 

现在,我目前有三个实现此接口的类,这些类中的每一个都以某种方式利用异步方法调用,引入了某种类型的multithreading处理:

 public class SerialIO : IDataIO; public class UdpIO : IDataIO; public class TcpIO : IDataIO; 

每个类的一个实例都包含在我的最终类中,称为IO(它也实现了IDataIO – 遵循我的策略模式):

 public class IO : IDataIO { public SerialIO Serial; public UdpIO Udp; public TcpIO Tcp; } 

我已经利用策略模式来封装这三个类,这样当在运行时在不同的IDataIO实例之间进行更改时,它会对最终用户“隐藏”。 你可以想象,这导致了背景中的“事件管道”。

那么,我如何在我的案例中使用“推送”通知? 我想简单地将数据推送给任何感兴趣的人,而不是订阅事件(DataReceived等)。 我有点不确定从哪里开始。 我还在尝试使用Subject的想法/generics类,以及它的各种forms(ReplaySubject / AsynSubject / BehaviourSubject)。 有人可以请教我这个(也许参考我的设计)? 或者这根本不适合IObservable

PS。 随意纠正我的任何“误解”:)

Observable非常适合表示数据流,因此DataReceived事件可以很好地建模到可观察模式,例如IObservableIObservable 。 您还可以获得OnErrorOnComplete的额外好处。

在实现它方面,很难说出你的具体情况,但我们经常使用Subject作为底层源,并调用OnNext来推送数据。 也许是这样的

 // Using a subject is probably the easiest way to push data to an Observable // It wraps up both IObservable and IObserver so you almost never use IObserver directly private readonly Subject subject = new Subject(); private void OnPort_DataReceived(object sender, EventArgs e) { // This pushes the data to the IObserver, which is probably just a wrapper // around your subscribe delegate is you're using the Rx extensions this.subject.OnNext(port.Data); // pseudo code } 

然后,您可以通过属性公开主题:

 public IObservable DataObservable { get { return this.subject; } // Or this.subject.AsObservable(); } 

您可以使用IObservable替换IDataIO上的DataReceived事件,并让每个策略类以他们需要的方式处理他们的数据,然后推送到Subject

另一方面,订阅Observable的任何人都能够像事件一样处理它(只需使用Action ),或者你可以使用SelectWhereBuffer ,在流上执行一些非常有用的工作。等等

 private IDataIO dataIo = new ... private void SubscribeToData() { dataIo.DataObservable.Buffer(16).Subscribe(On16Bytes); } private void On16Bytes(IList bytes) { // do stuff } 

ReplaySubject / ConnectableObservable非常棒,如果您知道您的订阅者将迟到派对但仍需要了解所有事件。 源缓存它所推送的所有内容并为每个订户重放所有内容。 只有你可以说这是否是你真正需要的行为(但要小心,因为它会缓存所有会增加你的内存使用量的东西)。

当我学习Rx时,我发现http://licampbell.blogspot.co.uk/关于Rx的博客系列对于理解这个理论非常有用(post现在有点过时,API已经改变了,所以要注意这个)

这绝对是可观察的理想情况。 IO类可能会看到最大的改进。 首先,让我们更改界面以使用observables,看看组合类变得多么简单。

 public interface IDataIO { //you will have to fill in the types here. Either the event args //the events provide now or byte[] or something relevant would be good. IObservable DataReceived; IObservable Timeout; IObservable Transmit; } public class IO : IDataIO { public SerialIO Serial; public UdpIO Udp; public TcpIO Tcp; public IObservable DataReceived { get { return Observable.Merge(Serial.DataReceived, Udp.DataReceived, Tcp.DataReceived); } } //similarly for other two observables } 

侧面注意:您可能会注意到我更改了接口成员名称。 在.NET中,事件通常命名为 ,引发它们的函数称为On

对于生产类,您有几个选项取决于实际来源。 假设您在SerialIO中使用.NET SerialPort类,并且DataReceived返回IObservable 。 由于SerialPort已经有一个接收数据的事件,您可以直接使用它来创建您需要的可观察数据。

 public class SerialIO : IDataIO { private SerialPort _port; public IObservable DataRecived { get { return Observable.FromEventPattern( h => _port.DataReceived += h, h => _port.DataReceived -= h) .Where(ep => ep.EventArgs.EventType == SerialData.Chars) .Select(ep => { byte[] buffer = new byte[_port.BytesToRead]; _port.Read(buffer, 0, buffer.Length); return buffer; }); } } } 

对于您没有现有事件源的情况,您可能需要使用RichK建议的主题。 他的回答很好地涵盖了这种使用模式,所以我不会在这里复制它。

您没有展示如何使用此接口,但根据用例,在这些类上使用其他函数返回IObservable本身并完全取消这些“事件”可能更有意义。 使用基于事件的异步模式,您必须将事件与您调用的函数分开才能触发工作,但是使用observable,您可以从函数返回它们,以使其更明显地为您所订阅的内容。 该方法还允许从每次调用返回的observable发送OnErrorOnCompleted消息以指示操作结束。 根据您对组合类的使用,我不认为这在这种特殊情况下有用,但要记住这一点。

使用IObservable而不是事件

如果只对该房产感兴趣,那么nuget包rxx有:

 IObservable obs=Observable2.FromPropertyChangedPattern(() => obj.Name) 

(以及许多其他方法)


或者如果事件排除了属性更改/希望避免实现INotifyPropertyChanged

 class ObserveEvent_Simple { public static event EventHandler SimpleEvent; static void Main() { IObservable eventAsObservable = Observable.FromEventPattern( ev => SimpleEvent += ev, ev => SimpleEvent -= ev); } } 

类似于u / Gideon Engelberth来自http://rxwiki.wikidot.com/101samples#toc6

由https://rehansaeed.com/reactive-extensions-part2-wrapping-events/提供


此代码项目文章也致力于将事件转换为被动事件

https://www.codeproject.com/Tips/1078183/Weak-events-in-NET-using-Reactive-Extensions-Rx

并且还处理弱订阅