NewThreadScheduler.Default计划在同一个线程上的所有工作

我目前正试图用RX .NET来解决并发问题,并对某些事情感到困惑。 我想并行运行四个相对较慢的任务,所以我假设NewThreadScheduler.Default将是要走的路,因为它“代表一个在一个单独的线程上调度每个工作单元的对象”。

这是我的设置代码:

  static void Test() { Console.WriteLine("Starting. Thread {0}", Thread.CurrentThread.ManagedThreadId); var query = Enumerable.Range(1, 4); var obsQuery = query.ToObservable(NewThreadScheduler.Default); obsQuery.Subscribe(DoWork, Done); Console.WriteLine("Last line. Thread {0}", Thread.CurrentThread.ManagedThreadId); } static void DoWork(int i) { Thread.Sleep(500); Console.WriteLine("{0} Thread {1}", i, Thread.CurrentThread.ManagedThreadId); } static void Done() { Console.WriteLine("Done. Thread {0}", Thread.CurrentThread.ManagedThreadId); } 

我假设“X线程Y”每次都会输出不同的线程ID,但实际输出是:

 Starting. Thread 1 Last line. Thread 1 1 Thread 3 2 Thread 3 3 Thread 3 4 Thread 3 Done. Thread 3 

所有工作都按顺序在同一个新线程上进行,这不是我所期待的。

我假设我错过了什么,但我无法弄清楚是什么。

可观察查询有两个部分: Query本身和Subscription 。 (这也是ObserveOn和SubscribeOn运算符之间的区别。)

你的Query

 Enumerable .Range(1, 4) .ToObservable(NewThreadScheduler.Default); 

这将创建一个observable,在该系统的默认NewThreadScheduler生成值

您的订阅是

 obsQuery.Subscribe(DoWork, Done); 

Query使用OnComplete调用Done时,这将为QueryDone生成的每个值运行DoWork 。 我不认为有什么保证会调用subscribe方法中的函数,实际上如果查询的所有值都是在运行订阅的线程的同一个线程上产生的。 看起来他们也在制作它所以所有的订阅调用都是在同一个线程上进行的,这很可能是为了摆脱许多常见的multithreading错误。

因此,如果您将Query更改为,则有两个问题,一个是您的日志记录

 Enumerable .Range(1, 4) .Do(x => Console.WriteLine("Query Value {0} produced on Thread {1}", x, Thread.CurrentThread.ManagedThreadId); .ToObservable(NewThreadScheduler.Default); 

您将看到在新线程上生成的每个值。

另一个问题是Rx的意图和设计之一。 这是一个长期运行的过程,而Subscription是一个处理结果的简短方法。 如果要将长时间运行的函数作为Rx Observable运行,最好的选择是使用Observable.ToAsync 。