为什么从给定订阅者抛出时从未调用OnError回调?

请观察以下unit testing:

using System; using System.Reactive.Linq; using System.Threading; using System.Threading.Tasks; using Microsoft.VisualStudio.TestTools.UnitTesting; namespace UnitTests { [TestClass] public class TestRx { public const int UNIT_TEST_TIMEOUT = 5000; private static IObservable GetObservable(int count = 100, int msWait = 10) { return Observable.Create(async (obs, cancellationToken) => { for (int i = 0; i  { Thread.Sleep(msWait); return value; })); } }); } [TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)] public void Subscribe() { var tcs = new TaskCompletionSource(); int i = 0; GetObservable().Subscribe(n => { Assert.AreEqual(i, n); ++i; }, e => Assert.Fail(), () => { Assert.AreEqual(100, i); tcs.TrySetResult(null); }); tcs.Task.Wait(); } [TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)] public void SubscribeCancel() { var tcs = new TaskCompletionSource(); var cts = new CancellationTokenSource(); int i = 0; GetObservable().Subscribe(n => { Assert.AreEqual(i, n); ++i; if (i == 5) { cts.Cancel(); } }, e => { Assert.IsTrue(i  { Assert.IsTrue(i < 100); tcs.TrySetResult(null); }, cts.Token); tcs.Task.Wait(); } [TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)] public void SubscribeThrow() { var tcs = new TaskCompletionSource(); int i = 0; GetObservable().Subscribe(n => { Assert.AreEqual(i, n); ++i; if (i == 5) { throw new Exception("xo-xo"); } }, e => { Assert.AreEqual("xo-xo", e.Message); tcs.TrySetResult(null); }, Assert.Fail); tcs.Task.Wait(); } } } 

该unit testingSubscribeCancelSubscribeThrow超时,因为从未调用OnError回调,因此等待任务永远不会结束。

怎么了?

PS

这个问题与如何正确包装SqlDataReader和IObservable有关?

编辑

与此同时,我创建了一个新的Rx问题 – https://rx.codeplex.com/workitem/74

另外http://social.msdn.microsoft.com/Forums/en-US/5d0a4808-3ee0-4ff0-ab11-8cd36460cd66/why-is-the-onerror-callback-never-called-when-throwing-from-the -given订户?论坛= RX

EDIT2

以下观察者实现产生完全相同的结果,即使它符合Rx设计指南的第6.5段 – “订阅实现不应抛出”:

 private static IObservable GetObservable(int count = 100, int msWait = 10) { return Observable.Create(async (obs, cancellationToken) => { try { for (int i = 0; i  { Thread.Sleep(msWait); return value; })); } obs.OnCompleted(); } catch (Exception exc) { obs.OnError(exc); } }); } 

EDIT3

我开始相信,当异步可观察序列被集成到一个其他同步代码中时,应该编写一个这样的代码(在一个地方或另一个地方的服务器端通常是这种情况):

 var tcs = new TaskCompletionSource(); GetObservable().Subscribe(n => { try { ... } catch (Exception e) { DoErrorLogic(); tcs.TrySetException(e); } }, e => { DoErrorLogic(); tcs.TrySetException(e); }, () => { DoCompletedLogic(); tcs.TrySetResult(null); }); tcs.Task.Wait(); 

真的是这样吗?

编辑4

我想它终于开始涓涓细流了我想说的东西。 我现在将切换到我的其他post – 如何正确地使用IObservable包装SqlDataReader?

此行为是设计使然。 如果订阅者抛出exception(顺便说一下这是不好的做法),Rx框架正确地说它已经死了并且不再进行通信。 如果订阅被取消,这也不是错误 – 只是要求不发送任何其他类型的事件 – 这是Rx所尊重的。

编辑以回应评论

我不认为在文档中有一个简单的参考点 – 你所看到的行为是如此内在,它是隐含的。 我能得到的最接近的是指向AnonymousSafeObserver和AutoDetatchObserver的源代码。 后者有一个可能有帮助的解释性方案,但它有点牵扯。

也许类比会有所帮助。 想象一下,数据流事件是由报刊经纪人发布的报纸。 而订户是家庭。

订阅者抛出exception

报刊经理愉快地发送报纸直到有一天,其中一位订阅者 – 琼斯先生 – 留下他的汽油,他的房子爆炸杀死琼斯先生并摧毁房子(抛出未处理的例外)。 报刊经销商意识到他不能再向琼斯先生发送报纸,他也不能发送终止通知,报纸供应也没有问题(因此OnError或OnCompleted不合适)并且报刊经纪人继续减少用户数量。

与此相反,报纸印刷商无意中使用易燃油墨并将工厂送入火焰中。 现在,可怜的报刊经纪人必须向所有供应已无限期停止的订户发送解释性说明(OnError)。

订阅者取消订阅

琼斯先生正在收到他订阅的报纸,直到有一天他决定厌倦了无数令人沮丧的故事,并要求取消他的订阅。 新闻通讯员有责任。 他没有给琼斯先生发一张纸条,说明报纸已停止印刷版(没有OnCompleted) – 他们没有。 他也没有给琼斯先生发一张纸条,说明报纸已经停业(没有OnError) – 正如琼斯先生所要求的那样,他只是停止发送报纸。

对Edit3的回应

我同情你的斗争。 我在整个代码中都注意到你一直试图将TPL(任务)习惯用法与Rx进行网格化。 这种尝试经常感到笨拙,因为它们确实是完全不同的世界。 评论这样的段落很难:

我开始相信,当异步可观察序列被集成到一个其他同步代码中时,应该编写一个这样的代码(在一个地方或另一个地方的服务器端通常是这种情况):

与Brandon精心设计的断言强烈赞同,我无法想象在您尝试的方式中将异步代码与服务器端的同步代码集成在一起的实例。 这对我来说就像是一种设计气味。 通俗地说,人们会尝试保持代码的react native – 进行订阅,并让订阅者处理react native工作。 我不记得有必要按照你描述的方式过渡到同步代码。

当然,看看你在Edit3中编写的代码,目前还不清楚你想要实现的目标。 消息来源不负责对订户中的错误做出反应。 这是摇尾巴的尾巴。 需要在那里确保订户服务连续性的exception处理程序应该在订阅处理代码中,而不是在源可观察的代码中 – 它应该仅关注防止流氓观察者行为。 这种逻辑在上面链接的AnonymousSafeObserver中实现,并且被大多数Rx提供的运算符使用。 可观察量很可能具有处理其数据连续性的逻辑 – 但这是一个不同的问题,而不是您在代码中解决的问题。

无论您ToTask尝试通过调用ToTaskWait来桥接同步代码,都可能需要仔细考虑您的设计。

我觉得提供一个更具体的问题陈述 – 可能来自你试图解决的现实世界场景 – 将有助于为你找到更有用的建议。 您说的’SqlDataReader`示例…

最后人们可以通过订阅直接使用observable [包装SqlDataReader],但是他们必须在某个时候等待结束(阻塞线程),因为大多数代码仍然是同步的。

…突出显示你所处的设计泥潭。在这种情况下,你推断这样的消费者显然会更好地使用IEnumerable接口 – 或者可能要求IObservable> 。 但关键是要看大局,你试图将SqlDataReader包装在一个可观察的包装器的事实是一种设计气味 – 因为这是一个固定数据供应以响应特定的一次性请求。 这可能是一个异步场景 – 但并非真正的反应场景。 与更常见的反应情景形成对比,比如“只要你拿到它就给股票X发送价格”,你在那里完全按照来源的要求设置未来的数据流,然后用户就会做出反应。

它没有在指南中明确说明,但它是由Rx语法和IObservables的目的所暗示的。 IObservables将来自源的信息传递给一个或多个观察者 。 传递的信息是数据(通过OnNext ),可选地后跟OnCompletedOnError 。 重要的是要记住这些回调是由触发的。 它们不能也不应该由观察者触发。

如果调用OnError ,那将是因为源可观察链中的某些内容失败了。 永远不会因为观察者失败了。

在您的SubscribeThrow示例中, 观察者 (由您提供的3个为OnNextOnErrorOnCompleted提供的lambda构造)失败。 观察者中的这些错误不能也不应该导致源可观察本身失败。

RX 2.0引入了保障措施以确保此合同。 阅读RX 2.0发布博客文章的“改版error handling策略”部分。

相关问题: 如何在使用ObserveOn时处理OnNext中的exception?

EDIT3

这当然是一种方法,但它非常难看。 首先,我将挑战你的断言,异步服务器端代码通常最终需要同步与某些同步代码进行交互。 我发现只有在unit testing中才是真的。

但无论如何,我相信你只是过早订阅。 我对Rx的体验是每当我遇到摩擦时,都是因为我订阅太快而应该扩展可观察的monad链。

在您的示例中,不要订阅数据流并在观察者中处理它,而是将您的处理器逻辑视为传入数据的另一个投影。 在这种情况下,您的逻辑只是将一段数据转换为工作结果。 这使您可以将逻辑的成功或失败视为流的一部分,然后您可以按照自己的方式进行观察。 你最终得到这个:

 var data = GetObservable(); var results = data.Select(item => { DoWork(item); // since your work does not produce anything... // it either succeeds or throws an exception // and you cannot make an Observable // return Unit.Default. Unit is the Rx equivalent of // void. return Unit.Default; }); // subscribe to the stream and wait synchronously for it to finish results.Wait(); // this will throw an exception the first time DoWork fails // or asynchronously await the stream to finish...just like a Task await results; // or turn the stream into a Task that completes when the processing is complete. var task = results.ToTask(); 

或者,如果您不想在第一个错误上停止处理,而只是累积错误,该怎么办? 现在,您可以将自己的工作视为投影……

 var results = data.Select(item => { try { DoWork(item); return null; // no error } catch (Exception e) { return e; } }).Where(e => e != null).ToList(); var errorList = results.Wait(); // or var errorList = await results; // or Task> errorListTask = results.ToTask(); 

这两个例子看起来都更简单,更清晰,而且可以通过不同的方式思考问题。