抛出EventLoopScheduler后RX2.0:ObjectDisposedException

我们最近将系统从RX 1.11111移植到RX 2.0并发现了这个问题。 我们对ObserveOn使用EventLoopScheduler,如下所示:

IDisposable subscription = someSubject .ObserveOn(m_eventLoopScheduler) .SomeMoreRXFunctions() .Subscribe((something)=>something) 

调度程序m_eventLoopScheduler.Dispose在应用程序出口( m_eventLoopScheduler.Dispose )上。 在此之前,我们处理observable( subscription.Dispose )的所有subscription.Dispose

尽管如此,我们在EventLoopScheduler.Schedule中得到了一个ObjectDisposedException 。 捕获该exception是不可能的,因为它起源于RX线程。 这几乎就像Dispose没有摆脱某些队列中的所有项目。

我们试图删除对EventLoopScheduler.Dispose的调用,exception消失了。 但是, SomeMoreRXFunctions()的代码执行了大约10次,尽管所有订阅都被处理掉了。

还有其他方法可以正确关闭EventLoopScheduler吗?

关于订阅的一些观察

(对不起,无法抗拒双关语!) IObservable ,几乎每个Rx运算符实现的接口,只有一个重要的方法:

 IDisposable Subscribe(IObserver observer); 

纯粹通过这种方法并处理它的返回值,观察者(实现IObserver )可以确定订阅的开始和结束时间。

通常 (直接或间接)对作为链的一部分的Observable进行订阅时,这将导致在链中进一步订阅。 确切地说,如果发生这种情况,那么就是Observable。

在许多情况下,订阅收到的订阅之间的关系不是一对一的。 一个例子是Publish(),它最多只能有一个订阅源,无论它收到多少订阅。 这真的是Publish的重点。

在其他情况下,这种关系具有时间方面。 例如,Concat()不会订阅它的第二个流,直到第一个流具有OnCompleted() – 这可能永远不会!

值得花一点时间来研究Rx设计指南 ,因为它们有一些非常相关的话要说:

Rx设计指南

4.4。 尽最大努力停止取消订阅的所有优秀工作。 当在可观察订阅上调用取消订阅时,可观察序列将尽最大努力尝试停止所有未完成的工作。 这意味着任何尚未启动的排队工作都将无法启动。

任何正在进行的工作仍可能完成,因为中止正在进行的工作并不总是安全的。 此工作的结果不会发送给任何先前订阅的观察者实例。

底线

注意这里的含义; 最重要的是, 当任何上游订阅可能被制作或处置时 ,它完全取决于Observable的实现 。 换句话说, 绝对不能保证处理订阅会导致Observable处置它直接或间接发出的任何或所有订阅。 这适用于运营商或其上游订阅使用的任何其他资源(例如计划的操作)。

您可以期待的最好的是每个上游运营商的作者确实尽最大努力阻止所有出色的工作。

回到问题(最后!)

在没有看到SomeMoreRXFunctions的内容的SomeMoreRXFunctions我无法确定,但看起来很可能是您所看到的exception是因为 – 尽管处置了您所知道的订阅 – 通过处置调度程序,您已经从下面撕下了地毯还在运行订阅的脚。 实际上,你是造成这样的:

 void Main() { var scheduler = new EventLoopScheduler(); // Decide it's time to stop scheduler.Dispose(); // The next line will throw an ObjectDisposedException scheduler.Schedule(() => {}); } 

编写一个完全合理的运算符很容易导致这个问题 – 即使是不直接使用调度程序的运算符! 考虑一下:

 public static class ObservableExtensions { public static IObservable ReasonableDelay (this IObservable source, IObservable delay) { return Observable.Create(observer => { var subscription = new SerialDisposable(); subscription.Disposable = delay .IgnoreElements() .Subscribe(_ => {}, () => { Console.WriteLine("Waiting to subscribe to source"); // Artifical sleep to create a problem Thread.Sleep(TimeSpan.FromSeconds(2)); Console.WriteLine("Subscribing to source"); // Is this line safe? subscription.Disposable = source.Subscribe(observer); }); return subscription; }); } } 

一旦传递的延迟可观察完成,该运营商将订阅源。 看看它有多合理 – 它使用一个SerialDisposable来正确地将两个潜在的单独订阅作为一个一次性的观察者呈现给它。

但是,破坏此运算符并使其导致exception是微不足道的:

 void Main() { var scheduler = new EventLoopScheduler(); var rx = Observable.Range(0, 10, scheduler) .ReasonableDelay(Observable.Timer(TimeSpan.FromSeconds(1))); var subs = rx.Subscribe(); Thread.Sleep(TimeSpan.FromSeconds(2)); subs.Dispose(); scheduler.Dispose(); } 

这里发生了什么事? 我们在EventLoopScheduler上创建一个Range ,但是使用它的默认调度程序将我们的ReasonableDelay与使用Timer创建的延迟流相关联。

现在我们订阅,等到我们的延迟流完成,然后我们按照“正确的顺序”处理我们的订阅和EventLoopScheduler。

我使用Thread.Sleep插入的人工延迟确保了一种可以轻易自然发生的竞争条件 – 延迟已经完成,订阅已经处理但是为了防止Range操作员访问已处置的EventLoopScheduler为时已晚。

我们甚至可以加强我们的合理努力,以检查观察员是否在延迟部分完成后取消订阅:

 // In the ReasonableDelay method .Subscribe(_ => {}, () => { if(!subscription.IsDisposed) // Check for unsubscribe { Console.WriteLine("Waiting to subscribe to source"); // Artifical sleep to create a problem Thread.Sleep(TimeSpan.FromSeconds(2)); Console.WriteLine("Subscribing to source"); // Is this line safe? subscription.Disposable = source.Subscribe(observer); } }); 

它无济于事。 纯粹在此运算符的上下文中也无法使用锁定语义。

你做错了什么

你没有处理EventLoopScheduler的业务! 一旦将其传递给其他Rx操作员,您就已经承担了责任。 由Rx运营商遵循指南,尽可能及时地清理他们的订阅 – 这意味着直接或间接取消EventLoopScheduler上的任何待处理的计划项目并停止任何进一步的计划,以便它的队列清空可能。

在上面的示例中,您可以将问题归因于多个调度程序的有些人为的使用以及ReasonableDelay中的强制Sleep – 但是对于操作员无法立即清理的真实场景并不难。

本质上,通过配置Rx调度程序,您正在执行Rx等效的线程中止。 就像那个场景一样,你可能有例外处理!

正确的做法是拆开神秘的SomeMoreRXFunctions()并确保它们尽可能地遵守指南。

刚刚注意到这个问题作为这个问题的链接: 处理后的Reactive Rx 2.0 EventLoopScheduler ObjectDisposedException

应该在这里重新发布我在那里所做的事情 – 我不知道有什么方法可以“刷新”调度程序,但是你可以用这种方式包装/处理不可避免的“对象处置”exception:

 EventLoopScheduler scheduler = new EventLoopScheduler(); var wrappedScheduler = scheduler.Catch((ex) => { Console.WriteLine("Got an exception:" + ex.ToString()); return true; }); for (int i = 0; i < 100; ++i) { var handle = Observable.Interval(TimeSpan.FromMilliseconds(1)) .ObserveOn(wrappedScheduler) .Subscribe(Observer.Create((x) => Thread.Sleep(1000))); handles.Add(handle); } 

我们遇到了同样的问题,并最终执行以下操作来处置EventLoopScheduler,没有例外:

scheduler.Schedule(() => scheduler.Dispose());

如果你在执行此操作之前正确处理了所有订阅(你说你做了),那么在调用Dispose之前,Dipose()调用是最后一次调度操作,所有其他挂起操作都可以完成。

为了使其更加健壮/可重用,您可以创建自己的IScheduler实现,包含EventLoopScheduler,它将所有操作委托给它+实现Dispose,如上所示。 最重要的是,您可以在Schedule方法中实现防护,以防止在调用Dispose后调度操作(例如,如果您忘记取消订阅某些观察者)。

部分解决了。 案件比较复杂,然后在这里显示。 链是这样的:

var published = someSubject.ObserveOn(m_eventLoopScheduler).SomeMoreRXFunctions()。Publish();

IDisposable disposable1 = published.Connect();

IDisposable disposable2 = published.Subscribe((something)=> something);

如果我同时处理了disposable1和disposable2,则SomeMoreRXFunctions()中的代码不再执行。 另一方面,尝试处理调度程序本身仍然会抛出相同的exception。

不幸的是,我无法用更简单的代码重现这个问题。 这可能表明我还缺少一些东西。

这是一个我们可以忍受的解决方案,但我仍然希望找到更好的方法,一次性关闭调度程序,没有exception的机会。