等待IObservable以获取所有元素错误

我有这堂课:

public class TestService { public IObservable GetObservable(int max) { var subject = new Subject(); Task.Factory.StartNew(() => { for (int i = 0; i < max; i++) { subject.OnNext(i); } subject.OnCompleted(); }); return subject; } } 

我也为此写了一个测试方法:

 [TestMethod] public void TestServiceTest1() { var testService = new TestService(); var i = 0; var observable = testService.GetObservable(3); observable.Subscribe(_ => { i++; }); observable.Wait(); Assert.AreEqual(i, 3); } 

但有时我得到错误: Sequence在方法Wait() 中不包含任何元素

我建议在测试到达observable.Wait()之前完成我的IObservable。 我怎样才能避免这个错误?

在我看来,这段代码中的基本问题是IObservable 代表了如何观察某事的契约。

在这种情况下, GetObservable方法不只是返回一个契约,而是立即执行工作(使用TPL)。 如果您认为可以多次订阅同一个IObservable实例(实际上是在示例代码中发生,因为您第一次使用Subscribe而另一个SubscribeWait ),这没有任何意义。 这个单一的区别是我学习Rx的最大绊脚石。

我会改为实现这样的方法(并避免完全使用Subject<> ,因为它不是创建Observable的首选方法):

 public class TestService { public IObservable GetObservable(int max) { return Observable.Create((IObserver observer) => { for (int i = 0; i < max; i++) { observer.OnNext(i); } observer.OnCompleted(); }); } } 

Observable.Create也有一些有趣的改写,它们提供了更大的灵活性。