Tag: subject

如何避免在RX中使用Subjects

所以我一直在读到使用Subject是“坏”的 – 并且我赞同这个推理。 但是,我试图想出避免使用它的最佳方法,并举一个例子。 目前,我有一个持久化配置类的抽象类,它有一个受保护的Save()方法,只要更改属性应该保持类,就会调用该方法。 此消息将消息泵送到Subject ,通过IObservable接口公开,序列化服务侦听并序列化该类。 这似乎是当时实现这一目标的最明显,最简单,最快捷的方式。 那么在不使用主题的情况下,RX方式会是什么呢? 我是否会公开一个事件并使用Observable.FromEventPattern()来订阅它? – 因为这似乎是一种更复杂的方式。

非重播热观察

原始问题 我有一个场景,我有多个IObservable序列,我想与Merge结合,然后听。 但是,如果其中一个产生错误,我不希望它崩溃其他流的所有内容,以及重新订阅序列(这是一个’永久’序列)。 我通过在合并之前向流添加Retry()来执行此操作,即: IEnumerable<IObservable> observables = GetObservables(); observables .Select(o => o.Retry()) .Merge() .Subscribe(/* Do subscription stuff */); 但是,当我想测试时会出现问题。 我想测试的是,如果observables一个IObservable产生一个OnError ,那么其他的仍应该能够发送它们的值并且它们应该被处理 我以为我只使用两个Subject代表两个observables IObservable ; 一个发送OnError(new Exception()) ,另一个发送OnNext(1) 。 但是,似乎Subject将重放新订阅的所有先前值(实际上是Retry() ),将测试转换为无限循环。 我尝试通过创建一个手动IObservable来解决它,该手动IObservable在第一个订阅上产生错误,之后是一个空序列,但它感觉很hacky: var i = 0; var nErrors = 2; var testErrorObservableWithOneErrorAndThenCompletion = Observable.Create(o => { i++; if (i < nErrors) { return Observable.Throw(new Exception()).Subscribe(o); } […]