缓存数据库游标中的数据,同时保持UI响应

我有一个填充的数据库目录,以及一个可用于检索对象的游标。 这个目录显然可能非常大,我想做的是使用ReactiveUI缓冲数据,同时保持UI数据绑定和响应。 我按照这里的步骤将我的IEnumerable转换为IObservable ,如下所示:

 public class CatalogService { ... public IObservable DataSource { get { return Observable.Create(obs => { var cursor = Database.Instance.GetAllObjects(); var status = cursor.MoveToFirst(); while (status == DbStatus.OK) { var dbObject= Db.Create(cursor); obs.OnNext(dbObject); status = cursor.MoveToNext(); } obs.OnCompleted(); return Disposable.Empty; }); } } } 

在我的视图类(特别是Loaded事件)中,我订阅数据源并使用缓冲区方法,希望保持UI响应。

  public ObservableCollection DbObjects { get; set; } private async void OnLoad(object sender, RoutedEventArgs e) { var observableData = CatalogService.Instance.DataSource.Publish(); var chunked = observableData.Buffer(TimeSpan.FromMilliseconds(100)); var dispatcherObs = chunked.ObserveOnDispatcher(DispatcherPriority.Background); dispatcherObs.Subscribe(dbObjects => { foreach (var dbObject in dbObjects) { DbObjects.Add(dbObject); } }); await Task.Run(() => observableData.Connect()); await dispatcherObs.ToTask(); } 

不幸的是,结果恰恰相反。 当我的视图控件(包含一个绑定到DbObjects属性的简单ListBox )加载时,在枚举整个目录之前,它不会显示任何数据。 只有这样才能刷新UI。

我是ReactiveUI的新手,但我确信它能胜任手头的任务。 如果我使用不正确,有没有人有任何建议或指示?

在进一步的信息之前,我的猜测是你可能有几个零长度缓冲区,具体取决于数据库查询需要多长时间,然后是一个包含所有结果的非零长度缓冲区。 您可能最好不要根据长度和时间来限制缓冲区大小。

编辑 – 我只是想分析原始实现中涉及的各种线程。 我不同意Paul的分析,我不相信UI线程因数据库查询而被阻止。 我认为由于大量结果被缓冲而被阻止。

查理 – 拜托,你能用代码(而不是调试器)计算数据库查询并转储你得到的缓冲区长度。

我将注释代码以显示所涉及的所有三个线程的顺序:

首先,在提供的代码之外,我假设通过Loaded事件调用OnLoad

(1) – UI线程调用OnLoad

 public ObservableCollection DbObjects { get; set; } private async void OnLoad(object sender, RoutedEventArgs e) { // (2) UI Thread enters OnLoad var observableData = CatalogService.Instance.DataSource.Publish(); var chunked = observableData // (6) Thread A OnNext passes into Buffer .Buffer(TimeSpan.FromMilliseconds(100)); // (7) Thread B, threadpool thread used by Buffer to run timer var dispatcherObs = chunked // (8) Thread B still .ObserveOnDispatcher(DispatcherPriority.Background); // (9) Non blocking OnNexts back to UI Thread dispatcherObs.Subscribe(dbObjects => { // (10) UI Thread receives buffered dbObjects foreach (var dbObject in dbObjects) { // (11) UI Thread hurting while all these images are // stuffed in the collection in one go - This is the issue I bet. DbObjects.Add(dbObject); } }); await Task.Run(() => { // (3) Thread A - a threadpool thread, // triggers subscription to DataSource // UI Thread is *NOT BLOCKED* due to await observableData.Connect() }); // (13) UI Thread - Dispatcher call back here at end of Create call // BUT UI THREAD WAS NOT BLOCKED!!! // (14) UI Thread - This task will be already completed // It is causing a second subscription to the already completed published observable await dispatcherObs.ToTask(); } public class CatalogService { ... public IObservable DataSource { get { return Observable.Create(obs => { // (4) Thread A runs Database query synchronously var cursor = Database.Instance.GetAllObjects(); var status = cursor.MoveToFirst(); while (status == DbStatus.OK) { var dbObject= Db.Create(cursor); // (5) Thread A call OnNext obs.OnNext(dbObject); status = cursor.MoveToNext(); } obs.OnCompleted(); // (12) Thread A finally completes subscription due to Connect() return Disposable.Empty; }); } } } 

我认为问题是一个大缓冲区,一次性将大量结果卸载到ObservableCollection中,为列表框创建了大量工作。

你的问题在这里:

  while (status == DbStatus.OK) { var dbObject= Db.Create(cursor); obs.OnNext(dbObject); status = cursor.MoveToNext(); } 

一旦有人订阅,该循环就会以阻塞的方式同步运行。 由于您在UI线程上创建订阅(在您调用Connect时),它将在UI线程上运行整个事务。 将其更改为:

 return Observable.Create(obs => { Observable.Start(() => { var cursor = Database.Instance.GetAllObjects(); var status = cursor.MoveToFirst(); while (status == DbStatus.OK) { var dbObject= Db.Create(cursor); obs.OnNext(dbObject); status = cursor.MoveToNext(); } obs.OnCompleted(); }, RxApp.TaskPoolScheduler); return Disposable.Empty; });