Rx如何并行化长时间运行的任务?

我有以下代码片段,它枚举了一些xml的元素(从svn log --xml ...进程的输出中读取),然后为每个xml元素运行一个长时间运行的方法。

 var proc = Process.Start(svnProcInfo); var xml = XDocument.Load(proc.StandardOutput); var xElements = xml.Descendants("path") .ToObservable() //.SubscribeOn(ThreadPoolScheduler.Instance) .Select(descendant => return LongRunning(descendant)); xElements //.SubscribeOn(NewThreadScheduler.Default) .Subscribe(result => Console.WriteLine(result); Console.ReadKey(); 

LongRunning方法并不重要,但在其中我记录了它运行的线程。 我们假设它运行一整秒。

我的问题是,取消注释SubscribeOn()行没有任何效果。 对LongRunning的调用是顺序的,并且每隔一秒发生在同一个线程上(尽管与主(初始)线程不同)。

这是一个控制台应用程序。

我是Rx的新手。 我错过了什么?

编辑:

在尝试了Lee Campbell的回答之后,我注意到了另一个问题。

 Console.Error.WriteLine("Main thread " + Thread.CurrentThread.ManagedThreadId); var xElements = xml.Descendants("path").ToObservable() //.ObserveOn(Scheduler.CurrentThread) .SelectMany(descendant => Observable.Start(()=>LongRunning(descendant),NewThreadScheduler.Default)) .Subscribe(result => Console.WriteLine( "Result on: " + Thread.CurrentThread.ManagedThreadId)); [...] string LongRunning(XElement el) { Console.WriteLine("Execute on: Thread " + Thread.CurrentThread.ManagedThreadId); DoWork(); Console.WriteLine("Finished on Thread " + Thread.CurrentThread.ManagedThreadId); return "something"; } 

这给出了以下输出:

 Main thread 1 Execute on: Thread 3 Execute on: Thread 4 Execute on: Thread 5 Execute on: Thread 6 Execute on: Thread 7 Finished on Thread 5 Finished on Thread 6 Result on: 5 Result on: 6 Finished on Thread 7 Result on: 7 Finished on Thread 3 Result on: 3 Finished on Thread 4 Result on: 4 Done! Press any key... 

我需要的是一种将结果“排队”到同一个线程的方法。 我认为这就是ObserveOn()的用途,但是对上面的ObserveOn()行进行取消注释并不会改变结果。

首先,Rx是用于控制异步的库(或范例),特别是可观察的序列。 你在这里有一个可枚举的序列(Xml Descendants)和一个阻塞/同步LongRunning方法调用。

通过在可枚举序列上调用ToObservable() ,您实际上只是遵守接口,但是当您的序列被实现时(急切而不是懒惰),没有任何关于它的Observable / Async。

通过调用SubscribeOn ,您有了正确的想法,但转换已在ToObservable()运算符中完成。 您可能打算调用ToObservable(ThreadPoolScheduler.Instance)以便可以在另一个线程上完成IEnumerable任何慢速迭代。 但是……我认为这不会是一个缓慢的迭代器,所以这可能无法解决任何问题。

您最想要做的事情(如果Rx是此类问题的最佳工具,这是可疑的)是安排调用LongRunning方法。 但是,这意味着您需要将Asyncrony添加到您的选择中。 一个很好的方法是使用Observable.FromAsyncObservable.Start类的Rx Factory方法。 但是,这将使您的序列成为IObservable> 。 您可以使用SelectManyMerge其展平。

说完这一切之后,我想你想做的是:

 var proc = Process.Start(avnProcInfo); var xml = XDocument.Load(proc.StandardOutput); //EDIT: Added ELS to serialise results onto a single thread. var els = new EventLoopScheduler(threadStart=>new Thread(threadStart) { IsBackground=true, Name="MyEventLoopSchedulerThread" }); var xElements = xml.Descendants("path").ToObservable() .SelectMany(descendant => Observable.Start(()=>LongRunning(descendant),ThreadPoolScheduler.Instance)) .ObserveOn(els) .Subscribe(result => Console.WriteLine(result)); Console.ReadKey();