如何与发布和连接共享一个observable?

我有一个可观察的数据流,我正在应用操作,分成两个独立的流,对两个流中的每一个应用更多(不同的)操作,并再次合并在一起。 我试图使用Publish and Connect分享两个订阅者之间的可观察性,但每个订阅者似乎都在使用单独的流。 也就是说,在下面的示例中,我看到两个订阅者为流中的每个项目打印一次“执行昂贵的操作”。 (想象一下,昂贵的操作是在所有订阅者之间只发生一次的事情,因此我试图重用流。)我使用Publish and Connect尝试与两个订阅者共享合并的observable,但它似乎有错误的效果。

问题示例:

 var foregroundScheduler = new NewThreadScheduler(ts => new Thread(ts) { IsBackground = false }); var timer = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(10), foregroundScheduler); var expensive = timer.Select(i => { // Converting to strings is an expensive operation Console.WriteLine("Doing an expensive operation"); return string.Format("#{0}", i); }); var a = expensive.Where(s => int.Parse(s.Substring(1)) % 2 == 0).Select(s => new { Source = "A", Value = s }); var b = expensive.Where(s => int.Parse(s.Substring(1)) % 2 != 0).Select(s => new { Source = "B", Value = s }); var connectable = Observable.Merge(a, b).Publish(); connectable.Where(x => x.Source.Equals("A")).Subscribe(s => Console.WriteLine("Subscriber A got: {0}", s)); connectable.Where(x => x.Source.Equals("B")).Subscribe(s => Console.WriteLine("Subscriber B got: {0}", s)); connectable.Connect(); 

我看到以下输出:

 Doing expensive operation Doing expensive operation Subscriber A got: { Source = A, Value = #0 } Doing expensive operation Doing expensive operation Subscriber B got: { Source = B, Value = #1 } 

(输出继续,为简洁而截断。)

如何与两个订阅者共享observable?

您发布了错误的可观察对象。

使用当前代码合并然后发布像Observable.Merge(a, b).Publish(); 。 现在因为ab的定义是expensive你仍然可以获得两个expensive订阅。

订阅会创建这些管道:

原版的

如果你拿出.Publish();你可以看到这个.Publish(); 从你的代码。 输出变为:

 Doing an expensive operation Doing an expensive operation Doing an expensive operation Doing an expensive operation Subscriber A got: { Source = A, Value = #0 } Doing an expensive operation Doing an expensive operation Doing an expensive operation Doing an expensive operation Subscriber B got: { Source = B, Value = #1 } 

这会创建这些管道:

没有发布

因此,通过将.Publish()更改为expensive可以消除问题。 这就是你真正需要它的地方,因为它毕竟是昂贵的操作。

这是您需要的代码:

 var foregroundScheduler = new NewThreadScheduler(ts => new Thread(ts) { IsBackground = false }); var timer = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(10), foregroundScheduler); var expensive = timer.Select(i => { // Converting to strings is an expensive operation Console.WriteLine("Doing an expensive operation"); return string.Format("#{0}", i); }); var connectable = expensive.Publish(); var a = connectable.Where(s => int.Parse(s.Substring(1)) % 2 == 0).Select(s => new { Source = "A", Value = s }); var b = connectable.Where(s => int.Parse(s.Substring(1)) % 2 != 0).Select(s => new { Source = "B", Value = s }); var merged = Observable.Merge(a, b); merged.Where(x => x.Source.Equals("A")).Subscribe(s => Console.WriteLine("Subscriber A got: {0}", s)); merged.Where(x => x.Source.Equals("B")).Subscribe(s => Console.WriteLine("Subscriber B got: {0}", s)); connectable.Connect(); 

这很好地产生了以下内容:

 Doing an expensive operation Subscriber A got: { Source = A, Value = #0 } Doing an expensive operation Subscriber B got: { Source = B, Value = #1 } Doing an expensive operation Subscriber A got: { Source = A, Value = #2 } Doing an expensive operation Subscriber B got: { Source = B, Value = #3 } 

这给你这些管道:

昂贵的发布

您可以从此图像中看到仍然存在重复。 这很好,因为这些部件并不昂贵。

重复实际上很重要。 管道的共享部分使其端点易受错误影响,从而提前终止。 分享越少,代码的健壮性就越好。 只有当您进行昂贵的操作时才应该担心发布。 否则你应该让管道自己。

这是一个展示它的例子。 如果您没有已发布的源,那么,如果一个源产生错误,那么它不会下拉所有管道。

分离

但是一旦你引入了一个共享的observable,那么一个错误就会导致所有的管道崩溃。

共享

一个可能的解决方

 var foregroundScheduler = new NewThreadScheduler(ts => new Thread(ts) { IsBackground = false }); var timer = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(10), foregroundScheduler); var expensive = timer.Select(i => { // Converting to strings is an expensive operation Console.WriteLine("Doing an expensive operation"); return string.Format("#{0}", i); }); var subj = new ReplaySubject(); expensive.Subscribe(subj); var a = subj.Where(s => int.Parse(s.Substring(1)) % 2 == 0).Select(s => new { Source = "A", Value = s }); var b = subj.Where(s => int.Parse(s.Substring(1)) % 2 != 0).Select(s => new { Source = "B", Value = s }); var merged = Observable.Merge(a, b); merged.Where(x => x.Source.Equals("A")).Subscribe(s => Console.WriteLine("Subscriber A got: {0}", s)); merged.Where(x => x.Source.Equals("B")).Subscribe(s => Console.WriteLine("Subscriber B got: {0}", s)); 

上面的示例实际上创建了一个新的中间可观察对象,它发出了昂贵操作的结果。 这允许您订阅昂贵操作的结果,而不是应用于计时器的昂贵转换。

有了这个,你会看到:

 Doing an expensive operation Subscriber A got: { Source = A, Value = #0 } Doing an expensive operation Subscriber B got: { Source = B, Value = #1 } 

(输出继续,为简洁而截断。)

或者,您可以将调用移至PublishConnect

 var foregroundScheduler = new NewThreadScheduler(ts => new Thread(ts) {IsBackground = false}); var timer = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(10), foregroundScheduler); var expensive = timer.Select(i => { // Converting to strings is an expensive operation Console.WriteLine("Doing an expensive operation"); return string.Format("#{0}", i); }).Publish(); var a = expensive.Where(s => int.Parse(s.Substring(1)) % 2 == 0).Select(s => new { Source = "A", Value = s }); var b = expensive.Where(s => int.Parse(s.Substring(1)) % 2 != 0).Select(s => new { Source = "B", Value = s }); var merged = Observable.Merge(a, b); merged.Where(x => x.Source.Equals("A")).Subscribe(s => Console.WriteLine("Subscriber A got: {0}", s)); merged.Where(x => x.Source.Equals("B")).Subscribe(s => Console.WriteLine("Subscriber B got: {0}", s)); expensive.Connect(); 

为什么选择ReplaySubject ,而不仅仅是Subject或其他主题?

默认情况下,.NET Rx实现中的Subject是ReactiveX文档调用PublishSubject ,该发布PublishSubject仅向观察者发出在订阅时间之后由源Observable发出的项目。 另一方面, ReplaySubject向任何观察者发出源Observable发出的所有项目, 无论观察者何时订阅 。 如果我们在第一个示例中使用普通主题,则将subj订阅到计时器将导致对subj订阅错过在主题订阅昂贵操作的时间与他们订阅中间主题的时间之间发出的任何内容( subj )。