Rx和任务 – 在生成新任务时取消运行任务?

我有一个我想用Rx处理的用户交互场景。

该场景类似于规范“当用户停止输入,做一些工作”(通常,搜索用户到目前为止输入的内容)(1) – 但我还需要:

  • (2)只获得“做一些工作”单位的最新结果(见下文)
  • (3)当新的工作单元开始时, 取消正在进行的任何工作(在我的情况下,它是CPU密集型的)

对于(1)我使用IObservable作为用户事件,使用.Throttle()仅触发事件之间的暂停(“用户停止键入”)。

从那,我。选择.Select(_ => CreateMyTask(...).ToObservable())

这给了我一个IObservable<IObservable> ,其中每个内部observable包装一个任务。

为了得到(2)我最终应用.Switch()来获得最新工作单元的结果。

那么(3) – 取消待定任务?

如果我理解正确的话,只要有一个新的内部IObservable .Switch()方法就会订阅它,并取消订阅前一个(),使它们成为Dispose()
也许这可能以某种方式连线触发任务取消?

您可以使用Observable.FromAsync ,它将生成在观察者取消订阅时取消的标记:

 input.Throttle(...) .Select(_ => Observable.FromAsync(token => CreateMyTask(..., token))) .Switch() .Subscribe(...); 

这将为每个工作单元生成一个新令牌,并在每次Switch切换到新工作单时取消它。

你必须使用任务吗?

如果你很乐意与Observables一起工作,那么你可以自己做得很好。

尝试做这样的事情:

 var query = Observable.Create(o => { var cancelling = false; var cancel = Disposable.Create(() => { cancelling = true; }); var subscription = Observable.Start(() => { for (var i = 0; i < 100; i++) { Thread.Sleep(10); //1000 ms in total if (cancelling) { Console.WriteLine("Cancelled on {0}", i); return -1; } } Console.WriteLine("Done"); return 42; }).Subscribe(o); return new CompositeDisposable(cancel, subscription); }); 

这个observable正在使用Thread.Sleep(10);在for循环中做一些艰苦的工作Thread.Sleep(10); ,但是当处理了observable时,循环退出并且密集的CPU工作停止。 然后,您可以使用标准Rx DisposeSwitch取消正在进行的工作。

如果你想在一个方法中捆绑,那么试试这个:

 public static IObservable Start(Func, T> work) { return Observable.Create(o => { var cancelling = false; var cancel = Disposable .Create(() => cancelling = true); var subscription = Observable .Start(() => work(() => cancelling)) .Subscribe(o); return new CompositeDisposable(cancel, subscription); }); } 

然后用这样的函数调用它:

 Func, int> work = cancelling => { for (var i = 0; i < 100; i++) { Thread.Sleep(10); //1000 ms in total if (cancelling()) { Console.WriteLine("Cancelled on {0}", i); return -1; } } Console.WriteLine("Done"); return 42; }; 

这是我的代码certificate这有效:

 var disposable = ObservableEx .Start(work) .Subscribe(x => Console.WriteLine(x)); Thread.Sleep(500); disposable.Dispose(); 

我得到了“取消50”(有时候“取消51”)作为我的输出。