在不停止序列的情况下处理Reactive Extensions中的exception

为什么RX具有以下语法OnNext* (OnError|OnCompleted) ? 而不是(OnNext|OnError)* OnCompleted ? 从实现角度来看这是非常清楚的(这也与IEnumerableyield有共同的语义),但我想与现实生活情况不同。 在现实生活中 – 生产者生成混合的数据流和exception(exception不会破坏生产者)。

问题:如果我理解正确,唯一可能的解决方案是使可观察的返回复杂数据结构从初始数据和生成的exception( Observable.Timestamp().TimeInterval()具有类似的概念)组合或是否还有其他选项?


目前我得出了以下解决方案:在可观察的生产者内部我手动处理exeptions并将它们传递给以下数据结构,这是我的可观察结果

 public class ValueOrException { private readonly Exception ex; private readonly T value; public ValueOrException(T value, Exception ex) { this.value = value; this.ex = ex; } public ValueOrException(T value) { this.value = value; } public T Value { get { return this.value; } } public Exception Ex { get { return this.ex; } } } 

如果消费者是知道是否预期Exception的消费者,那么我说在这里抛出exception是不正确的。

你本质上是在尝试传达状态或逻辑,这些exception并不意味着要做。 例外意味着将系统(或至少是当前操作)暂停,因为发生了一些代码本身并不知道如何从中恢复的事情。

在这种情况下,恰好是Exception是业务逻辑/状态的一部分,但这并不意味着你可以抛出它们。 您需要将它们下游传递给您的消费者,然后让您的消费者处理它们。

一个Tuple在这里可以解决,但是对于类型(以及属性)缺乏特异性通常是混乱的,所以我建议创建一个专用类型来传达操作的结果和通过IObservable公开。

OnError意味着发生了不可恢复的错误,因此应该重置流。 恢复之后,下一个事件在exception之前发生的事件的上下文中没有意义。

例如,考虑事件流:

  • 用户加入了聊天
  • 用户加入了聊天
  • 聊天崩溃,所有用户都被踢了
  • 用户加入了聊天

现在,流消费者使用聚合function来显示聊天的数量。 最后应该展示什么? 当然是1,而不是3.我们应该在exception后从头开始计数。

ValueOrException类似于函数式编程中的Either类型。 你刚刚交换了Left和Right。 按惯例,右用于成功,左用于失败。