为什么每个观察委托都在一个新线程上运行

在Rx中使用Scheduler.NewThread进行ObserveOn方法时,当Rx已经保证OnNexts永远不会重叠时,让每个Observation委托(OnNext)在新线程上运行的优点是什么。 如果要逐个调用每个OnNext,为什么需要为每个OnNext创建新线程。

我理解为什么一个人想在一个不同于订阅和应用程序线程的线程上运行Observation委托,但是当它们永远不会并行运行时,在新线程上运行每个观察委托?….对我来说没有意义我在这里遗漏了什么?

例如

using System; using System.Linq; using System.Reactive.Concurrency; using System.Reactive.Linq; using System.Threading; namespace RxTesting { class Program { static void Main(string[] args) { Console.WriteLine("Application Thread : {0}", Thread.CurrentThread.ManagedThreadId); var numbers = from number in Enumerable.Range(1,10) select Process(number); var observableNumbers = numbers.ToObservable() .ObserveOn(Scheduler.NewThread) .SubscribeOn(Scheduler.NewThread); observableNumbers.Subscribe( n => Console.WriteLine("Consuming : {0} \t on Thread : {1}", n, Thread.CurrentThread.ManagedThreadId)); Console.ReadKey(); } private static int Process(int number) { Thread.Sleep(500); Console.WriteLine("Producing : {0} \t on Thread : {1}", number, Thread.CurrentThread.ManagedThreadId); return number; } } } 

上面的代码产生以下结果。 请注意,每次都在新线程上完成Consuming。

 Application Thread : 8 Producing : 1 on Thread : 9 Consuming : 1 on Thread : 10 Producing : 2 on Thread : 9 Consuming : 2 on Thread : 11 Producing : 3 on Thread : 9 Consuming : 3 on Thread : 12 Producing : 4 on Thread : 9 Consuming : 4 on Thread : 13 Producing : 5 on Thread : 9 Consuming : 5 on Thread : 14 Producing : 6 on Thread : 9 Consuming : 6 on Thread : 15 Producing : 7 on Thread : 9 Consuming : 7 on Thread : 16 Producing : 8 on Thread : 9 Consuming : 8 on Thread : 17 Producing : 9 on Thread : 9 Consuming : 9 on Thread : 18 Producing : 10 on Thread : 9 Consuming : 10 on Thread : 19 

NewThread调度程序对长期运行的订阅者很有用。 如果未指定任何调度程序,则会阻止生产者等待订阅者完成。 通常,您可以使用Scheduler.ThreadPool,但是如果您希望有许多长时间运行的任务,则不希望用它们堵塞您的线程池(因为它可能不仅仅被单个可观察对象的订阅者使用) )。

例如,请考虑对您的示例进行以下修改。 我将延迟移动到订户并添加了主线程何时准备好进行键盘输入的指示。 注意取消注释NewThead行时的区别。

 using System; using System.Linq; using System.Reactive.Concurrency; using System.Reactive.Linq; using System.Threading; namespace RxTesting { class Program { static void Main(string[] args) { Console.WriteLine("Application Thread : {0}", Thread.CurrentThread.ManagedThreadId); var numbers = from number in Enumerable.Range(1, 10) select Process(number); var observableNumbers = numbers.ToObservable() // .ObserveOn(Scheduler.NewThread) // .SubscribeOn(Scheduler.NewThread) ; observableNumbers.Subscribe( n => { Thread.Sleep(500); Console.WriteLine("Consuming : {0} \t on Thread : {1}", n, Thread.CurrentThread.ManagedThreadId); }); Console.WriteLine("Waiting for keyboard"); Console.ReadKey(); } private static int Process(int number) { Console.WriteLine("Producing : {0} \t on Thread : {1}", number, Thread.CurrentThread.ManagedThreadId); return number; } } } 

那么为什么Rx不优化为每个用户使用相同的线程呢? 如果订阅者长时间运行以至于您需要一个新线程,那么线程创建开销无论如何都是微不足道的。 一个例外是,如果大多数订阅者很短但有一些订阅者长时间运行,那么重用相同线程的优化确实很有用。

我不确定你是否注意到但是如果消费者比生产者慢(例如,如果你在订阅动作中添加更长的睡眠)他们将共享相同的线程,所以它可能是一种确保订阅者消费的机制内容一发布即可。

 namespace RxTesting { class Program { static void Main(string[] args) { Console.WriteLine("Application Thread : {0}", Thread.CurrentThread.ManagedThreadId); var numbers = from number in Enumerable.Range(1,10) select Process(number); var observableNumbers = numbers.ToObservable() .ObserveOn(Scheduler.NewThread) .SubscribeOn(Scheduler.NewThread); observableNumbers.Subscribe( n => { Console.WriteLine("Consuming : {0} \t on Thread : {1}", n, Thread.CurrentThread.ManagedThreadId); Thread.Sleep(600); } ); Console.ReadKey(); } private static int Process(int number) { Thread.Sleep(500); Console.WriteLine("Producing : {0} \t on Thread : {1}", number, Thread.CurrentThread.ManagedThreadId); return number; } } } 

输出:

 Application Thread : 1 Producing : 1 on Thread : 3 Consuming : 1 on Thread : 4 Producing : 2 on Thread : 3 Consuming : 2 on Thread : 4 Producing : 3 on Thread : 3 Consuming : 3 on Thread : 4 Producing : 4 on Thread : 3 Consuming : 4 on Thread : 4 Producing : 5 on Thread : 3 Consuming : 5 on Thread : 4 Producing : 6 on Thread : 3 Consuming : 6 on Thread : 4 Producing : 7 on Thread : 3 Producing : 8 on Thread : 3 Consuming : 7 on Thread : 4 Producing : 9 on Thread : 3 Consuming : 8 on Thread : 4 Producing : 10 on Thread : 3 Consuming : 9 on Thread : 4 Consuming : 10 on Thread : 4