Rx – 如何从Task 创建IObservable ,以便取消订阅取消任务?
我是Rx的新手所以请耐心等待。
我想在IObservable
包装一个Task
IObservable
。 到现在为止还挺好:
Task task = Task.Factory.StartNew(...); IObservable obs = task.ToObservable();
现在,我想要的是在观察者取消订阅时发出取消信号:
var cancel = new CancellationToken(); Task task = Task.Factory.StartNew(..., cancel); IObservable obs = task.ToObservable(); //there should be a way to tie the cancel token //to the IObservable (?) IDisposable disposable = obs.Subscribe(...); Thread.Sleep(1000); disposable.Dispose(); // this should signal the task to cancel
我怎么做?
FWIW这里是生成此切线的场景: Rx和任务 – 在生成新任务时取消运行任务?
这是我能想到的最简单的方法,使用Observable.Create
:
static IObservable SomeRxWork() { return Observable.Create (o => { CancellationTokenSource cts = new CancellationTokenSource(); IDisposable sub = SomeAsyncWork(cts.Token).ToObservable().Subscribe(o); return new CompositeDisposable(sub, new CancellationDisposable(cts)); }); } static Task SomeAsyncWork(CancellationToken token);
我在评论中暗示的最初方式实际上相当冗长:
static IObservable SomeRxWork() { return Observable.Create (async (o, token) => { try { o.OnNext(await SomeAsyncWork(token)); o.OnCompleted(); } catch (OperationCanceledException) { } catch (Exception ex) { o.OnError(ex); } }); }
假设您有这样的方法:
Task GetGizmoAsync(int id, CancellationToken cancellationToken);
您可以将其转换为IObservable
,其中订阅启动Task
,取消订阅通过使用以下内容取消它。
IObservable observable = Observable.FromAsync( cancellationToken => GetGizmoAsync(7, cancellationToken)); // Starts the task: IDisposable subscription = observable.Subscribe(...); // Cancels the task if it is still running: subscription.Dispose();