Observable.ObserveOn()似乎没有效果

我试图使用Rx并行处理项目。 似乎我不能告诉Rx并行运行我的观察者的OnNext()。 这是测试代码来演示

[Test] public void ObservableObserveOnNewThreadRunsInParallel() { Console.WriteLine("Starting thread: {0}", Thread.CurrentThread.ManagedThreadId); // store items as they are output var list = new List<Tuple>(); // used to wait until a sequences are complete var ev = new AutoResetEvent(false); // try these schedulers var schedulers = new[] { Tuple.Create("ThreadPoolScheduler.Instance", (IScheduler)ThreadPoolScheduler.Instance), Tuple.Create("NewThreadScheduler.Default", (IScheduler)NewThreadScheduler.Default), Tuple.Create("TaskPoolScheduler.Default", (IScheduler)TaskPoolScheduler.Default), Tuple.Create("Scheduler.Default", (IScheduler)Scheduler.Default), Tuple.Create("Scheduler.Immediate", (IScheduler)Scheduler.Immediate), }; // try each scheduler foreach (var schedulerTuple in schedulers) { // emit tuples  where delay decreases with each new tuple // such that output timing is expected to be reversed var observable = Observable.Range(0, 5) .Select(i => Tuple.Create((int)i, (int)(500 - i * 100))) .Take(5); var dt = DateTime.Now; Tuple scheduler = schedulerTuple; observable // specify the scheduler to use .ObserveOn(schedulerTuple.Item2) .Subscribe( v => { // emulate some work (first items take longer than last items) Thread.Sleep(v.Item2); // record when the item is done recording lock (list) list.Add( Tuple.Create( scheduler.Item1, v.Item1, v.Item2, Thread.CurrentThread.ManagedThreadId, dt - DateTime.Now)); }, // let the test go on () => ev.Set()); // wait until the end of the sequence ev.WaitOne(); } // print observed order foreach (var i in list) { Console.WriteLine(i); } } 

并输出:

 Starting thread: 5 (ThreadPoolScheduler.Instance, 0, 500, 9, -00:00:04.2514251) (ThreadPoolScheduler.Instance, 1, 400, 9, -00:00:04.6524652) (ThreadPoolScheduler.Instance, 2, 300, 9, -00:00:04.9524952) (ThreadPoolScheduler.Instance, 3, 200, 9, -00:00:05.1525152) (ThreadPoolScheduler.Instance, 4, 100, 9, -00:00:05.2525252) (NewThreadScheduler.Default, 0, 500, 11, -00:00:06.5066506) (NewThreadScheduler.Default, 1, 400, 11, -00:00:06.9066906) (NewThreadScheduler.Default, 2, 300, 11, -00:00:07.2067206) (NewThreadScheduler.Default, 3, 200, 11, -00:00:07.4067406) (NewThreadScheduler.Default, 4, 100, 11, -00:00:07.5067506) (TaskPoolScheduler.Default, 0, 500, 12, -00:00:00.5020502) (TaskPoolScheduler.Default, 1, 400, 12, -00:00:00.9020902) (TaskPoolScheduler.Default, 2, 300, 12, -00:00:01.2021202) (TaskPoolScheduler.Default, 3, 200, 12, -00:00:01.4021402) (TaskPoolScheduler.Default, 4, 100, 12, -00:00:01.5021502) (Scheduler.Default, 0, 500, 13, -00:00:00.5020502) (Scheduler.Default, 1, 400, 13, -00:00:00.9020902) (Scheduler.Default, 2, 300, 13, -00:00:01.2021202) (Scheduler.Default, 3, 200, 13, -00:00:01.4021402) (Scheduler.Default, 4, 100, 13, -00:00:01.5021502) (Scheduler.Immediate, 0, 500, 5, -00:00:00.5020502) (Scheduler.Immediate, 1, 400, 5, -00:00:00.9040904) (Scheduler.Immediate, 2, 300, 5, -00:00:01.2041204) (Scheduler.Immediate, 3, 200, 5, -00:00:01.4041404) (Scheduler.Immediate, 4, 100, 5, -00:00:01.5041504) 

请注意每个OnNext调用如何在前一次调用中等待,即使我明确使用ObserveOn()指定用于通知的调度程序。

我期望除了Scheduler.Immediate以外的所有内容并行运行通知。

谁知道我做错了什么?

这是设计的。 Rx的主要合同之一是必须序列化所有通知。

请参阅Rx设计指南中的§§4.2,6.7。

Observable表示Rx中的并发性,因此要重叠通知需要两个或更多可观察对象。 通知不会在同一观察者中重叠,但它们将相对于每个观察者重叠。

例如,如果需要同时执行两个方法(观察者),则需要定义两个observable。

从技术上讲,它是观察者(订阅)而不是并发所需的可观察量; 因此,根据观察者使用的调度程序,订阅两次相同的可观察量可以产生并发性; 但是,订阅两次相同的hot observable并不会导致并发。 (参见我的博文: Hot and Cold Observables 。)

ObserveOn在传递并发引入调度程序时引入了并发性。 但是如果不违反§6.7合同怎么办? 好吧,它将observable分成两个可观察量: 操作符之前和操作符之后 ! 或者,您可以将其视为两个订阅者或观察者: 之前之后 。 观察者之前ObserveOn提供的内部观察者。 after观察者是您的观察者,或查询中下一个运算符提供的观察者。

无论你如何看待它, 之前可观察的通知可以与after observable中的通知同时发生。 但是观察者只会在after observable的上下文中接收序列化通知。