Rx如何从pub / sub模式创建序列

我正在尝试使用Rx来评估从发布/子模式创建序列(即经典观察者模式,其中下一个元素由生产者发布)。 这基本上与.net事件相同,除了我们需要概括它以使得事件不是必需的,所以我无法利用Observable.FromEvent。 我玩过Observable.Create和Observable.Generate,发现自己最终不得不编写代码来处理pub / sub(即我必须编写生产者/消费者代码来存储已发布的项目,然后通过用它来调用IObserver.OnNext()),所以看起来我并没有真正利用Rx ……

我向下看正确的路径还是适合Rx?

谢谢

您的发布者只是将一些IObservables暴露为属性。 而且您的消费者只需Subscribe它们(或者在订阅之前做任何他们想要的Rx-fu)。

有时这就像在您的出版商中使用Subjects一样简单。 有时它更复杂,因为您的发布者实际上正在观察其他一些可观察的过程。

这是一个愚蠢的例子:

 public class Publisher { private readonly Subject _topic1; /// Observe Foo values on this topic public IObservable FooTopic { get { return _topic1.AsObservable(); } } private readonly IObservable _topic2; /// Observe the current time whenever our clock ticks public IObservable ClockTickTopic { get { return _topic2.Select(t => DateTime.Now); } } public Publisher() { _topic1 = new Subject(); // tick once each second _topic2 = Observable.Interval(TimeSpan.FromSeconds(1)); } /// Let everyone know about the new Foo public NewFoo(Foo foo) { _topic1.OnNext(foo); } } // interested code... Publisher p = ...; p.FooTopic.Subscribe(foo => ...); p.ClickTickTopic.Subscribe(currentTime => ...); // count how many foos occur during each clock tick p.FooTopic.Buffer(p.ClockTickTopic) .Subscribe(foos => Console.WriteLine("{0} foos during this interval", foos.Count)); 

使用RX绝对适合发布/发布。 这是一个演示,演示了使用IObservable和RX的最简单的pub / sub模式。

使用NuGet将Reactive Extensions(RX)添加到项目中,搜索rx-main并安装Reactive Extensions - Main Library

 using System; using System.Reactive.Subjects; namespace RX_2 { public static class Program { static void Main(string[] args) { Subject stream = new Subject(); stream.Subscribe( o => { Console.Write(o); }); stream.Subscribe( o => { Console.Write(o); }); for (int i = 0; i < 5; i++) { stream.OnNext(i); } Console.ReadKey(); } } } 

执行时,代码输出:

 0011223344