等待IObservable以获取所有元素错误
我有这堂课:
public class TestService { public IObservable GetObservable(int max) { var subject = new Subject(); Task.Factory.StartNew(() => { for (int i = 0; i < max; i++) { subject.OnNext(i); } subject.OnCompleted(); }); return subject; } }
我也为此写了一个测试方法:
[TestMethod] public void TestServiceTest1() { var testService = new TestService(); var i = 0; var observable = testService.GetObservable(3); observable.Subscribe(_ => { i++; }); observable.Wait(); Assert.AreEqual(i, 3); }
但有时我得到错误: Sequence在方法Wait() 中不包含任何元素 。
我建议在测试到达observable.Wait()之前完成我的IObservable。 我怎样才能避免这个错误?
在我看来,这段代码中的基本问题是IObservable
代表了如何观察某事的契约。
在这种情况下, GetObservable
方法不只是返回一个契约,而是立即执行工作(使用TPL)。 如果您认为可以多次订阅同一个IObservable
实例(实际上是在示例代码中发生,因为您第一次使用Subscribe
而另一个Subscribe
了Wait
),这没有任何意义。 这个单一的区别是我学习Rx的最大绊脚石。
我会改为实现这样的方法(并避免完全使用Subject<>
,因为它不是创建Observable的首选方法):
public class TestService { public IObservable GetObservable(int max) { return Observable.Create ((IObserver observer) => { for (int i = 0; i < max; i++) { observer.OnNext(i); } observer.OnCompleted(); }); } }
Observable.Create
也有一些有趣的改写,它们提供了更大的灵活性。