我怎么能等待取消订阅后在Rx可观察序列中完成所有操作?

介绍

在我的WPF C#.NET应用程序中,我使用反应式扩展(Rx)来订阅事件,我经常需要从数据库重新加载一些内容来获取更新UI所需的值,因为事件对象通常只包含ID和一些元数据。

我使用Rx调度在后台加载数据并更新调度程序上的UI。 我在Rx序列中混合“Task.Run”时遇到了一些不好的经验(当使用“SelectMany”时,不再保证顺序,并且很难控制UnitTests中的调度)。 另请参阅: 在反应流水线中执行TPL代码并通过测试调度程序控制执行

我的问题

如果我关闭我的应用程序(或关闭选项卡)我想取消订阅然后等待DB调用(从Rx“Select”调用)仍然可以在“subscription.Dispose”之后运行。 到目前为止,我还没有找到任何好的实用工具或简单方法。

问题

是否有任何框架支持等待仍在Rx链中运行的所有内容

如果没有,你有什么好主意如何使一个易于使用的实用程序?

是否有任何好的替代方法来实现同样的目标?

public async Task AwaitEverythingInARxChain() { // In real life this is a hot observable event sequence which never completes IObservable eventSource = Enumerable.Range(1, int.MaxValue).ToObservable(); IDisposable subscription = eventSource // Load data in the background .ObserveOn(Scheduler.Default) .Select(id => LoadFromDatabase(id)) // Update UI on the dispatcher .ObserveOn(DispatcherScheduler.Current) .SubscribeOn(Scheduler.Default) // In real life the source produces the event values on a background thread. .Subscribe(loadedData => UpdateUi(loadedData)); Thread.Sleep(TimeSpan.FromSeconds(10)); // In real life I want to cancel (unsubscribe) here because the user has closed the Application or closed the tab and return a task which completes when everything is done. // Unsubscribe just guarantees that no "OnNext" is called anymore, but it doesn't wait until all operations in the sequence are finished (for example "LoadFromDatabase(id)" can still be runnig here. subscription.Dispose(); await ?; // I need to await here, so that i can be sure that no "LoadFromDatabase(id)" is running anymore. ShutDownDatabase(); } 

我已经尝试过(并没有工作)

  • 使用“Finally”运算符设置TaskCompletionSource的结果。 这种方法的问题:最后在取消订阅后直接调用,“LoadFromDatabase”仍然可以运行

更新:控制台输出和TakeUntil的示例

 public async Task Main() { Observable .Timer(TimeSpan.FromSeconds(5.0)) .Subscribe(x => { Console.WriteLine("Cancel started"); _shuttingDown.OnNext(Unit.Default); }); await AwaitEverythingInARxChain(); Console.WriteLine("Cancel finished"); ShutDownDatabase(); Thread.Sleep(TimeSpan.FromSeconds(3)); } private Subject _shuttingDown = new Subject(); public async Task AwaitEverythingInARxChain() { IObservable eventSource = Observable.Range(0, 10); await eventSource .ObserveOn(Scheduler.Default) .Select(id => LoadFromDatabase(id)) .ObserveOn(Scheduler.Default) .TakeUntil(_shuttingDown) .Do(loadedData => UpdateUi(loadedData)); } public int LoadFromDatabase(int x) { Console.WriteLine("Start LoadFromDatabase: " + x); Thread.Sleep(1000); Console.WriteLine("Finished LoadFromDatabase: " + x); return x; } public void UpdateUi(int x) { Console.WriteLine("UpdateUi: " + x); } public void ShutDownDatabase() { Console.WriteLine("ShutDownDatabase"); } 

输出(实际):

 Start LoadFromDatabase: 0 Finished LoadFromDatabase: 0 Start LoadFromDatabase: 1 UpdateUi: 0 Finished LoadFromDatabase: 1 Start LoadFromDatabase: 2 UpdateUi: 1 Finished LoadFromDatabase: 2 Start LoadFromDatabase: 3 UpdateUi: 2 Finished LoadFromDatabase: 3 Start LoadFromDatabase: 4 UpdateUi: 3 Cancel started Cancel finished ShutDownDatabase Finished LoadFromDatabase: 4 Start LoadFromDatabase: 5 Finished LoadFromDatabase: 5 Start LoadFromDatabase: 6 Finished LoadFromDatabase: 6 Start LoadFromDatabase: 7 

预计:我希望保证以下是最后的输出:

 Cancel finished ShutDownDatabase 

我终于找到了解决方案。 你可以使用TakeWhile来实现它。 TakeUntil不起作用,因为当第二个可观察序列产生第一个值时,主要的可观察序列立即完成。

以下是工作解决方案的示例:

  public async Task Main_Solution() { CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); Observable .Timer(TimeSpan.FromSeconds(4)) .Subscribe(x => { Console.WriteLine("Cancel startedthread='{0}'", Thread.CurrentThread.ManagedThreadId); cancellationTokenSource.Cancel(); }); await AwaitEverythingInARxChain(cancellationTokenSource.Token); Console.WriteLine("Cancel finished thread='{0}'", Thread.CurrentThread.ManagedThreadId); ShutDownDatabase(); Thread.Sleep(TimeSpan.FromSeconds(10)); } public async Task AwaitEverythingInARxChain(CancellationToken token) { IObservable eventSource = Observable.Range(0, 10); await eventSource .ObserveOn(Scheduler.Default) .Select(id => LoadFromDatabase(id)) .TakeWhile(_ => !token.IsCancellationRequested) .ObserveOn(Scheduler.Default) // Dispatcher in real life .Do(loadedData => UpdateUi(loadedData)).LastOrDefaultAsync(); } public int LoadFromDatabase(int x) { Console.WriteLine("Start LoadFromDatabase: {0} thread='{1}'", x, Thread.CurrentThread.ManagedThreadId); Thread.Sleep(TimeSpan.FromSeconds(3)); Console.WriteLine("Finished LoadFromDatabase: {0} thread='{1}'", x, Thread.CurrentThread.ManagedThreadId); return x; } public void UpdateUi(int x) { Console.WriteLine("UpdateUi: '{0}' thread='{1}'", x, Thread.CurrentThread.ManagedThreadId); } public void ShutDownDatabase() { Console.WriteLine("ShutDownDatabase thread='{0}'", Thread.CurrentThread.ManagedThreadId); } 

并输出:

 Start LoadFromDatabase: 0 thread='9' Finished LoadFromDatabase: 0 thread='9' Start LoadFromDatabase: 1 thread='9' UpdateUi: '0' thread='10' Cancel startedthread='4' Finished LoadFromDatabase: 1 thread='9' Cancel finished thread='10' ShutDownDatabase thread='10' 

请注意,“ShutDownDatabase”是最后一个输出(如预期的那样)。 它等待“LoadFromDatabase”完成第二个值,即使它的生成值没有被进一步处理。 这正是我想要的。

这比你想象的要容易。 你可以await观察。 所以简单地这样做:

 public async Task AwaitEverythingInARxChain() { IObservable eventSource = Enumerable.Range(1, 10).ToObservable(); await eventSource .ObserveOn(Scheduler.Default) .Select(id => LoadFromDatabase(id)) .ObserveOn(DispatcherScheduler.Current) .Do(loadedData => UpdateUi(loadedData), () => ShutDownDatabase()); } 

在您的方法中使用一些Console.WriteLine操作,并在db调用中hibernate一点线程以模拟网络延迟,我得到此输出:

 LoadFromDatabase:1
 LoadFromDatabase:2
 UpdateUi:1
 LoadFromDatabase:3
 UpdateUi:2
 LoadFromDatabase:4
 UpdateUi:3
 LoadFromDatabase:5
 UpdateUi:4
 LoadFromDatabase:6
 UpdateUi:5
 LoadFromDatabase:7
 UpdateUi:6
 LoadFromDatabase:8
 UpdateUi:7
 LoadFromDatabase:9
 UpdateUi:8
 LoadFromDatabase:10
 UpdateUi:9
 UpdateUi:10
 ShutDownDatabase

如果您需要结束查询,只需创建一个shuttingDown主题:

 private Subject _shuttingDown = new Subject(); 

…然后像这样修改查询:

  await eventSource .ObserveOn(Scheduler.Default) .Select(id => LoadFromDatabase(id)) .ObserveOn(DispatcherScheduler.Current) .Do( loadedData => UpdateUi(loadedData), () => ShutDownDatabase()) .TakeUntil(_shuttingDown); 

你只需要发出一个_shuttingDown.OnNext(Unit.Default); 取消订阅观察。


这是我完整的工作测试代码:

 async Task Main() { Observable .Timer(TimeSpan.FromSeconds(5.0)) .Subscribe(x => _shuttingDown.OnNext(Unit.Default)); await AwaitEverythingInARxChain(); } private Subject _shuttingDown = new Subject(); public async Task AwaitEverythingInARxChain() { IObservable eventSource = Observable.Range(0, 10); await eventSource .ObserveOn(Scheduler.Default) .Select(id => LoadFromDatabase(id)) .ObserveOn(DispatcherScheduler.Current) .Finally(() => ShutDownDatabase()) .TakeUntil(_shuttingDown) .Do(loadedData => UpdateUi(loadedData)); } public int LoadFromDatabase(int x) { Console.WriteLine("LoadFromDatabase: " + x); Thread.Sleep(1000); return x; } public void UpdateUi(int x) { Console.WriteLine("UpdateUi: " + x); } public void ShutDownDatabase() { Console.WriteLine("ShutDownDatabase"); } 

我得到这个输出:

 LoadFromDatabase:0
 LoadFromDatabase:1
 UpdateUi:0
 LoadFromDatabase:2
 UpdateUi:1
 LoadFromDatabase:3
 UpdateUi:2
 LoadFromDatabase:4
 UpdateUi:3
 LoadFromDatabase:5
 UpdateUi:4
 ShutDownDatabase

请注意,observable尝试在10秒内生成10个值,但OnNext会缩短它。

你需要等待一些东西。 您无法等待订阅处理。 最简单的方法是将处置逻辑转换为observable本身的一部分:

 var observable = eventSource // Load data in the background .ObserveOn(Scheduler.Default) .Select(id => LoadFromDatabase(id)) .TakeUntil(Observable.Timer(TimeSpan.FromSeconds(10))) //This replaces your Thread.Sleep call .Publish() .RefCount(); var subscription = observable.ObserveOn(DispatcherScheduler.Current) .Subscribe(loadedData => UpdateUi(loadedData)); //do whatever you want here. await observable.LastOrDefault();