使用ObserveOn时如何处理OnNext中的exception?

当我使用ObserveOn(Scheduler.ThreadPool)时,观察者在OnNext抛出错误时,我的应用程序终止。 我发现处理这个问题的唯一方法是使用下面的自定义扩展方法(除了确保OnNext永远不会抛出exception)。 然后确保每个ObserveOn后跟一个ExceptionToError

  public static IObservable ExceptionToError(this IObservable source) { var sub = new Subject(); source.Subscribe(i => { try { sub.OnNext(i); } catch (Exception err) { sub.OnError(err); } } , e => sub.OnError(e), () => sub.OnCompleted()); return sub; } 

但是,这感觉不对。 有没有更好的方法来处理这个?

由于未捕获的exception,此程序崩溃。

 class Program { static void Main(string[] args) { try { var xs = new Subject(); xs.ObserveOn(Scheduler.ThreadPool).Subscribe(x => { Console.WriteLine(x); if (x % 5 == 0) { throw new System.Exception("Bang!"); } }, ex => Console.WriteLine("Caught:" + ex.Message)); // <- not reached xs.OnNext(1); xs.OnNext(2); xs.OnNext(3); xs.OnNext(4); xs.OnNext(5); } catch (Exception e) { Console.WriteLine("Caught : " + e.Message); // <- also not reached } finally { Console.ReadKey(); } } } 

我们在RC版本中从Rx v2.0开始解决这个问题。 您可以在我们的博客http://blogs.msdn.com/rxteam上阅读所有相关内容。 它基本上归结为管道本身更严格的error handling,结合了SubscribeSafe扩展方法(在订阅期间将错误重定向到OnError通道),以及IScheduler上的Catch扩展方法(用于包含调度程序的调度程序)动作)。

关于这里提出的ExceptionToError方法,它有一个缺陷。 回调运行时,IDisposable订阅对象仍然可以为null; 有一个基本的竞争条件。 要解决此问题,您必须使用SingleAssignmentDisposable。

订阅中的错误与可观察的错误之间存在差异。 快速测试:

 var xs = new Subject(); xs.Subscribe(x => { Console.WriteLine(x); if (x % 3 == 0) throw new System.Exception("Error in subscription"); }, ex => Console.WriteLine("Error in source: " + ex.Message)); 

运行这个,你会在源代码中得到一个很好的处理错误:

 xs.OnNext(1); xs.OnNext(2); xs.OnError(new Exception("from source")); 

使用此方法运行,您将在订阅中收到未处理的错误:

 xs.OnNext(1); xs.OnNext(2); xs.OnNext(3); 

您的解决方案所做的是在订阅中记录错误并在源中使它们出错 。 并且您已在原始流上完成此操作,而不是基于每个订阅。 您可能会或可能不会这样做,但几乎肯定是错的。

“正确”的方法是将您需要的error handling直接添加到订阅操作,这是它所属的位置。 如果您不想直接修改订阅function,可以使用一个小帮手:

 public static Action ActionAndCatch(Action action, Action catchAction) { return item => { try { action(item); } catch (System.Exception e) { catchAction(e); } }; } 

现在使用它,再次显示不同错误之间的差异:

 xs.Subscribe(ActionAndCatch(x => { Console.WriteLine(x); if (x % 3 == 0) throw new System.Exception("Error in subscription"); }, ex => Console.WriteLine("Caught error in subscription: " + ex.Message)), ex => Console.WriteLine("Error in source: " + ex.Message)); 

现在我们可以(单独)处理源中的错误和订阅中的错误。 当然,可以在方法中定义任何这些操作,使上面的代码像(可能)一样简单:

 xs.Subscribe(ActionAndCatch(Handler, ExceptionHandler), SourceExceptionHandler); 

编辑

在评论中,我们开始讨论订阅中的错误指向流本身中的错误这一事实,并且您不希望该流上的其他订阅者。 这是一个完全不同类型的问题 。 我倾向于编写一个可观察的Validate扩展来处理这种情况:

 public static IObservable Validate(this IObservable source, Predicate valid) { return Observable.Create(o => { return source.Subscribe( x => { if (valid(x)) o.OnNext(x); else o.OnError(new Exception("Could not validate: " + x)); }, e => o.OnError(e), () => o.OnCompleted() ); }); } 

然后简单易用,无需混淆隐喻(仅在源代码中出现错误):

 xs .Validate(x => x != 3) .Subscribe(x => Console.WriteLine(x), ex => Console.WriteLine("Error in source: " + ex.Message)); 

如果您仍希望在Subscribe抑制exception,则应使用其他讨论的方法之一。

您当前的解决方案并不理想。 正如其中一位Rx人员所说:

Rx运算符不会捕获对OnNext,OnError或OnCompleted的调用中发生的exception。 这是因为我们期望(1)观察者实现者最好地知道如何处理这些exception,我们不能对它们做任何合理的事情。(2)如果发生exception,那么我们希望它冒出来而不是由Rx处理。

您当前的解决方案获取IObservable来处理IObserver抛出的错误,这在语义上没有意义,因为IObservable不应该知道观察它的事情。 请考虑以下示例:

 var errorFreeSource = new Subject(); var sourceWithExceptionToError = errorFreeSource.ExceptionToError(); var observerThatThrows = Observer.Create(x => { if (x % 5 == 0) throw new Exception(); }, ex => Console.WriteLine("There's an argument that this should be called"), () => Console.WriteLine("OnCompleted")); var observerThatWorks = Observer.Create( x => Console.WriteLine("All good"), ex => Console.WriteLine("But definitely not this"), () => Console.WriteLine("OnCompleted")); sourceWithExceptionToError.Subscribe(observerThatThrows); sourceWithExceptionToError.Subscribe(observerThatWorks); errorFreeSource.OnNext(1); errorFreeSource.OnNext(2); errorFreeSource.OnNext(3); errorFreeSource.OnNext(4); errorFreeSource.OnNext(5); Console.ReadLine(); 

这里没有源或者观察者的问题,但是由于与另一个Observer的无关错误,它的OnError将被调用。 要阻止不同线程中的exception结束进程,您必须在该线程中捕获它们,因此在观察者中放置一个try / catch块。

你是对的 – 应该感觉很糟糕。 使用和返回这样的主题不是一个好方法。

至少你应该像这样实现这个方法:

 public static IObservable ExceptionToError(this IObservable source) { return Observable.Create(o => { var subscription = (IDisposable)null; subscription = source.Subscribe(x => { try { o.OnNext(x); } catch (Exception ex) { o.OnError(ex); subscription.Dispose(); } }, e => o.OnError(e), () => o.OnCompleted()); return subscription; }); } 

请注意,没有使用主题,如果我发现错误,我会处理订阅以防止序列继续发生错误。

但是,为什么不在订阅中添加OnError处理程序。 有点像这样:

 var xs = new Subject(); xs.ObserveOn(Scheduler.ThreadPool).Subscribe(x => { Console.WriteLine(x); if (x % 5 == 0) { throw new System.Exception("Bang!"); } }, ex => Console.WriteLine(ex.Message)); xs.OnNext(1); xs.OnNext(2); xs.OnNext(3); xs.OnNext(4); xs.OnNext(5); 

此代码在订阅中正确捕获错误。

另一种方法是使用Materialize扩展方法,但这可能有点过分,除非上述解决方案不起作用。