Rx – 如何从Task 创建IObservable ,以便取消订阅取消任务?

我是Rx的新手所以请耐心等待。

我想在IObservable包装一个Task IObservable 。 到现在为止还挺好:

 Task task = Task.Factory.StartNew(...); IObservable obs = task.ToObservable(); 

现在,我想要的是在观察者取消订阅时发出取消信号:

 var cancel = new CancellationToken(); Task task = Task.Factory.StartNew(..., cancel); IObservable obs = task.ToObservable(); //there should be a way to tie the cancel token //to the IObservable (?) IDisposable disposable = obs.Subscribe(...); Thread.Sleep(1000); disposable.Dispose(); // this should signal the task to cancel 

我怎么做?

FWIW这里是生成此切线的场景: Rx和任务 – 在生成新任务时取消运行任务?

这是我能想到的最简单的方法,使用Observable.Create

 static IObservable SomeRxWork() { return Observable.Create(o => { CancellationTokenSource cts = new CancellationTokenSource(); IDisposable sub = SomeAsyncWork(cts.Token).ToObservable().Subscribe(o); return new CompositeDisposable(sub, new CancellationDisposable(cts)); }); } static Task SomeAsyncWork(CancellationToken token); 

我在评论中暗示的最初方式实际上相当冗长:

 static IObservable SomeRxWork() { return Observable.Create(async (o, token) => { try { o.OnNext(await SomeAsyncWork(token)); o.OnCompleted(); } catch (OperationCanceledException) { } catch (Exception ex) { o.OnError(ex); } }); } 

假设您有这样的方法:

 Task GetGizmoAsync(int id, CancellationToken cancellationToken); 

您可以将其转换为IObservable ,其中订阅启动Task ,取消订阅通过使用以下内容取消它。

 IObservable observable = Observable.FromAsync( cancellationToken => GetGizmoAsync(7, cancellationToken)); // Starts the task: IDisposable subscription = observable.Subscribe(...); // Cancels the task if it is still running: subscription.Dispose();