如何正确观察非标准事件?

我是Reactive Extensions的新手,并且处理具有如下定义的事件的COM库:

public delegate void MyDelegate(int requestId, double price, int amount); public event MyDelegate MyEvent; 

我该如何正确观察这个? 我尝试使用Observable.FromEvent()但由于事件的参数不是EventArgs类型,我没有看到FromEvent()FromEventPattern()是如何工作的。

我目前的解决方法是将自定义委托附加到事件然后调用Subject.OnNext()但我猜这不是我应该怎么做。

这是我当前解决方法的一个示例:

  MyEvent += new MyDelegate((int requestId, double price, int amount) => { Task.Run(() => { var args = new MyArgs() { requestId = requestId, price = price, amount = amount, }; this.mySubject.OnNext(args); }); }); 

FromEvent有一个特殊的重载。 让你的头脑有点傻,但function签名如下:

 IObservable FromEvent(Func, TDelegate> conversion, Action addHandler, Action removeHandler); 

转换函数是这里的重要部分,基本上你告诉Rx你的委托如何映射到具体类型。

在您的场景中,它最终看起来像这样:

 Observable.FromEvent( converter => new MyDelegate( (id, price, amount) => converter(new MyArgs { RequestId = id, Price = price, Amount = amount }) ), handler => MyEvent += handler, handler => MyEvent -= handler); 

那么这一切又是什么呢? 在内部,它与你正在做的类似(我将从概念上解释它的作用,因为实现稍微复杂一些)。 当进行新的订阅时,将使用observer.OnNext作为converter参数传入converter函数。 这个lambda将返回一个新的MyDelegate实例,它包装了我们提供的转换函数( (id, price, amount) => ... )。 然后将其传递给handler => MyEvent += handler方法。

在每次触发事件之后,它将调用我们的lambda方法并将传递的参数转换为MyArgs一个实例,然后传递给converter / observer.OnNext

此外,对于所有这些魔法,它还将在您完成后清理事件处理程序,优雅地向下传递exception,并通过在多个观察者之间共享单个事件处理程序来管理内存。

源代码