在反应流水线中执行TPL代码并通过测试调度程序控制执行

我很难理解以下测试不起作用的原因:

[Fact] public void repro() { var scheduler = new TestScheduler(); var count = 0; // this observable is a simplification of the system under test // I've just included it directly in the test for clarity // in reality it is NOT accessible from the test code - it is // an implementation detail of the system under test // but by passing in a TestScheduler to the sut, the test code // can theoretically control the execution of the pipeline // but per this question, that doesn't work when using FromAsync Observable .Return(1) .Select(i => Observable.FromAsync(Whatever)) .Concat() .ObserveOn(scheduler) .Subscribe(_ => Interlocked.Increment(ref count)); Assert.Equal(0, count); // this call initiates the observable pipeline, but does not // wait until the entire pipeline has been executed before // returning control to the caller // the question is: why? Rx knows I'm instigating an async task // as part of the pipeline (that's the point of the FromAsync // method), so why can't it still treat the pipeline atomically // when I call Start() on the scheduler? scheduler.Start(); // count is still zero at this point Assert.Equal(1, count); } private async Task Whatever() { await Task.Delay(100); return Unit.Default; } 

我正在尝试做的是每当一个observable滴答时运行一些异步代码(由Whatever() )。 重要的是,我希望这些调用排队。 更重要的是,我希望能够通过使用TestScheduler来控制管道的执行。

似乎调用scheduler.Start()正在启动Whatever()的执行,但它不会等到它完成。 如果我改变Whatever()以便它是同步的:

 private async Task Whatever() { //await Task.Delay(100); return Unit.Default; } 

然后测试通过了,但当然这违背了我想要达到的目的。 我可以想象在TestScheduler一个我可以等待的TestScheduler StartAsync()方法,但这不存在。

任何人都可以告诉我是否有一种方法可以促使反应流水线的执行并等待其完成,即使它包含异步调用?

让我把你的问题归结为它的基本要素:

有没有办法,使用TestScheduler ,执行一个反应式管道并等待它完成,即使它包含异步调用?

我应该事先警告你,这里没有快速简单的答案,没有可以部署的方便“技巧”。

异步调用和调度程序

要回答这个问题,我认为我们需要澄清一些问题。 上述问题中的术语“异步调用”似乎专门用于指代具有TaskTask签名的方法 – 即使用任务并行库(TPL)异步运行的方法。

这一点很重要,因为Reactive Extensions(Rx)采用不同的方法来处理异步操作。

在Rx中,并发的引入是通过调度程序来管理的,调度程序是一种实现IScheduler接口的类型。 任何引入并发的操作都应该使调度程序参数可用,以便调用者可以决定适当的调度程序。 核心图书馆盲目地坚持这一原则。 因此,例如, Delay允许指定调度程序,但不允许。

从源头可以看出, IScheduler提供了许多Schedule重载。 需要并发的操作使用这些来安排工作的执行。 究竟如何执行该工作完全推迟到调度程序。 这是调度程序抽象的强大function。

引入并发性的Rx操作通常提供允许省略调度程序的重载,并且在这种情况下选择合理的默认值。 这一点很重要,因为如果您希望通过使用TestScheduler来测试代码, TestScheduler 必须对引入并发性的所有操作使用TestScheduler 。 一种不允许这种情况的流氓方法可能会破坏您的测试工作。

TPL调度抽象

TPL有自己的抽象来处理并发: TaskScheduler 。 这个想法非常相似。 你可以在这里读到它。 。

两个抽象之间有两个非常重要的区别:

  • Rx调度程序具有他们自己的时间概念的第一类表示 – Now属性。 TPL调度程序没有。
  • 在TPL中使用自定义调度程序的情况要少得多,并且没有提供重载的相同最佳实践,以便为引入并发的方法(返回TaskTask )提供特定的TaskSchedulers 。 绝大多数Task返回方法都假定使用默认的TaskScheduler并且您无法选择运行工作的位置。

TestScheduler的动机

使用TestScheduler的动机通常是双重的:

  • 通过加快时间来消除“等待”操作的需要。
  • 检查事件是否发生在预期的时间点。

这种方式的工作方式完全取决于调度程序有自己的时间概念。 每次通过IScheduler安排操作 ,我们都会指定它何时必须执行 – 要么尽快执行,要么在将来的特定时间执行。 然后,调度程序将工作排队等待执行,并在达到指定时间(根据调度程序本身)时执行它。

当你在TestScheduler上调用Start时,它的工作原理是在执行时间为当前概念Now或之前清空所有操作的队列,然后将其时钟提前到下一个计划工作时间并重复直到其队列为空。

这允许巧妙的技巧,如能够测试操作永远不会导致事件! 如果使用实时这将是一项具有挑战性的任务,但是使用虚拟时间很容易 – 一旦调度程序队列完全为空,那么TestScheduler断定不会再发生任何事件 – 因为如果队列中没有任何内容,则没有任何内容那里安排进一步的任务。 事实上, Start正是在这一点上回归。 为此,显然必须在TestScheduler上安排所有要测量的并发操作。

一个自定义运算符,它不小心自行选择调度程序而不允许覆盖该选项,或者在没有时间概念的情况下使用自己的并发forms的操作(例如基于TPL的调用)将使得很难(如果不是不可能的话)通过TestScheduler控制执行。

如果您通过其他方式运行异步操作,那么明智地使用AdvanceByAdvanceToAdvanceBy方法可以让您与该外来并发源协调 – 但是这可以实现的程度取决于该外来程序所提供的控制权。资源。

在TPL的情况下,您确实知道任务何时完成 – 这允许在测试中使用等待和超时,因为这些可能是丑陋的。 通过使用TaskCompleteSources (TCS),您可以模拟任务并使用AdvanceTo来命中特定点并完成TCS,但这里没有一个简单的方法。 通常你只需要诉诸丑陋的等待和超时,因为你没有足够的控制外国并发。

Rx通常是自由线程的,并尽可能避免引入并发。 相反,Rx调用链中的不同操作很可能需要不同类型的调度程序抽象。 使用单个测试调度程序模拟调用链并不总是可行的。 当然,我有理由使用多个TestSchedulers来模拟一些复杂的场景 – 例如,使用DispatcherSchedulerTaskScheduler链有时需要复杂的协调,这意味着你不能简单地将它们的操作序列化到一个TestScheduler

我曾经研究过的一些项目已经强制要求使用Rx来实现所有并发,以避免这些问题。 这并不总是可行的,即使在这些情况下,一些TPL的使用通常也是不可避免的。

一个特别的痛点

Rx的一个特殊痛点让许多测试人员摸不着头脑,事实是TPL – > Rx转换系列引入了并发性。 例如ToObservableSelectMany的重载接受Task等不提供调度程序的重载,并且暗中强迫你离开TestScheduler线程,即使使用TCS进行TestScheduler也是如此。 对于单独测试导致的所有痛苦,我认为这是一个错误。 你可以在这里阅读所有相关内容 – 挖掘并找到Dave Sexton的建议修复,它提供了指定调度程序的重载,并且正在考虑包含它。 您可能想查看该拉取请求。

潜在的解决方法

如果您可以编辑代码以使用它,则可能使用以下帮助程序方法。 它将任务转换为将在TestScheduler上运行并在正确的虚拟时间完成的observable。

它安排在负责收集任务结果的TestScheduler上工作 – 在我们声明任务完成的虚拟时间。 工作本身会阻塞,直到任务结果可用 – 允许TPL任务运行多长时间,或者直到经过一定量的指定时间,在这种情况下抛出TimeoutException

阻止工作的效果意味着TestScheduler将不会超过任务的预期虚拟完成时间,直到任务实际完成为止。 这样,Rx链的其余部分可以在全速虚拟时间运行,我们只等待TPL任务,在任务完成虚拟时间暂停链的其余部分,同时发生这种情况。

至关重要的是,计划在基于任务的操作的开始虚拟时间和任务的所述结束虚拟时间之间运行的其他并发Rx操作不会被阻止,并且它们的虚拟完成时间将不受影响。

因此,将duration设置为您希望任务显示的虚拟时间长度。 然后,将在任务启动时的虚拟时间加上指定的持续时间收集结果。

timeout设置为允许任务执行的实际时间。 如果需要更长时间,则抛出超时exception:

 public static IObservable ToTestScheduledObseravble( this Task task, TestScheduler scheduler, TimeSpan duration, TimeSpan? timeout = null) { timeout = timeout ?? TimeSpan.FromSeconds(100); var subject = Subject.Synchronize(new AsyncSubject(), scheduler); scheduler.Schedule>(task, duration, (s, t) => { if (!task.Wait(timeout.Value)) { subject.OnError( new TimeoutException( "Task duration too long")); } else { switch (task.Status) { case TaskStatus.RanToCompletion: subject.OnNext(task.Result); subject.OnCompleted(); break; case TaskStatus.Faulted: subject.OnError(task.Exception.InnerException); break; case TaskStatus.Canceled: subject.OnError(new TaskCanceledException(task)); break; } } return Disposable.Empty; }); return subject.AsObservable(); } 

代码中的用法就像这样,你的断言将通过:

 Observable .Return(1) .Select(i => Whatever().ToTestScheduledObseravble( scheduler, TimeSpan.FromSeconds(1))) .Concat() .Subscribe(_ => Interlocked.Increment(ref count)); 

结论

总之,你没有错过任何方便的技巧。 您需要考虑Rx如何工作,以及TPL如何工作并决定是否:

  • 你避免混合TPL和Rx
  • 您模拟TPL和Rx之间的接口(使用TCS或类似的),因此您可以独立测试每个接口
  • 你生活在丑陋的等待和超时TestScheduler完全放弃了TestScheduler
  • 您将丑陋的等待和超时与TestScheduler混合在一起,为您的测试带来一些控制。

Noseratio更优雅的Rx编写此测试的方式。 您可以await observable获取其最后一个值。 与Count()结合使用变得微不足道。

请注意, TestScheduler在此示例中不用于任何目的。

 [Fact] public async Task repro() { var scheduler = new TestScheduler(); var countObs = Observable .Return(1) .Select(i => Observable.FromAsync(Whatever)) .Concat() //.ObserveOn(scheduler) // serves no purpose in this test .Count(); Assert.Equal(0, count); //scheduler.Start(); // serves no purpose in this test. var count = await countObs; Assert.Equal(1, count); } 

正如詹姆斯在上面提到的,你不能像你一样混合并发模型。 您可以使用TestScheduler从Rx中删除并发,但实际上从未通过Rx实际引入并发。 然而,您确实使用TPL引入并发(即Task.Delay(100) 。这将实际上在任务池线程上异步运行。因此,您的同步测试将在任务完成之前完成。

你可以改成这样的东西

 [Fact] public void repro() { var scheduler = new TestScheduler(); var count = 0; // this observable is a simplification of the system under test // I've just included it directly in the test for clarity // in reality it is NOT accessible from the test code - it is // an implementation detail of the system under test // but by passing in a TestScheduler to the sut, the test code // can theoretically control the execution of the pipeline // but per this question, that doesn't work when using FromAsync Observable .Return(1) .Select(_ => Observable.FromAsync(()=>Whatever(scheduler))) .Concat() .ObserveOn(scheduler) .Subscribe(_ => Interlocked.Increment(ref count)); Assert.Equal(0, count); // this call initiates the observable pipeline, but does not // wait until the entire pipeline has been executed before // returning control to the caller // the question is: why? Rx knows I'm instigating an async task // as part of the pipeline (that's the point of the FromAsync // method), so why can't it still treat the pipeline atomically // when I call Start() on the scheduler? scheduler.Start(); // count is still zero at this point Assert.Equal(1, count); } private async Task Whatever(IScheduler scheduler) { return await Observable.Timer(TimeSpan.FromMilliseconds(100), scheduler).Select(_=>Unit.Default).ToTask(); } 

或者,您需要将Whatever方法放在可以模拟出来进行测试的接口后面。 在这种情况下,你只需要你的Stub / Mock / Double返回上面的代码,即return await Observable.Timer(TimeSpan.FromMilliseconds(100), scheduler).Select(_=>Unit.Default).ToTask();