使用Reactive Extensions搜索TextChanged

我试图在具有10000多条记录的数据库表上实现即时搜索。

搜索文本框内的文本发生更改时搜索开始,当搜索框变空时,我想调用另一种加载所有数据的方法。

此外,如果用户更改搜索字符串,而正在加载另一个搜索的结果,则应停止加载这些结果以支持新搜索。

我像下面的代码一样实现它,但我想知道是否有更好或更清晰的方法来使用Rx(Reactive Extension)运算符,我觉得在第一个observable的subscribe方法中创建第二个observable比声明性的,对于if语句也是如此。

var searchStream = Observable.FromEventPattern(s => txtSearch.TextChanged += s, s => txtSearch.TextChanged -= s) .Throttle(TimeSpan.FromMilliseconds(300)) .Select(evt => { var txtbox = evt.Sender as TextBox; return txtbox.Text; } ); searchStream .DistinctUntilChanged() .ObserveOn(SynchronizationContext.Current) .Subscribe(searchTerm => { this.parties.Clear(); this.partyBindingSource.ResetBindings(false); long partyCount; var foundParties = string.IsNullOrEmpty(searchTerm) ? partyRepository.GetAll(out partyCount) : partyRepository.SearchByNameAndNotes(searchTerm); foundParties .ToObservable(Scheduler.Default) .TakeUntil(searchStream) .Buffer(500) .ObserveOn(SynchronizationContext.Current) .Subscribe(searchResults => { this.parties.AddRange(searchResults); this.partyBindingSource.ResetBindings(false); } , innerEx => { } , () => { } ); } , ex => { } , () => { } ); 

SearchByNameAndNotes方法只是通过从数据读取器读取数据,使用SQLite返回IEnumerable

我想你想要这样的东西。 编辑:从你的评论,我看到你有一个同步存储库API – 我将保留异步版本,然后添加一个同步版本。 内联注释:

异步存储库版本

异步存储库接口可能是这样的:

 public interface IPartyRepository { Task> GetAllAsync(out long partyCount); Task> SearchByNameAndNotesAsync(string searchTerm); } 

然后我将查询重构为:

 var searchStream = Observable.FromEventPattern( s => txtSearch.TextChanged += s, s => txtSearch.TextChanged -= s) .Select(evt => txtSearch.Text) // better to select on the UI thread .Throttle(TimeSpan.FromMilliseconds(300)) .DistinctUntilChanged() // placement of this is important to avoid races updating the UI .ObserveOn(SynchronizationContext.Current) .Do(_ => { // I like to use Do to make in-stream side-effects explicit this.parties.Clear(); this.partyBindingSource.ResetBindings(false); }) // This is "the money" part of the answer: // Don't subscribe, just project the search term // into the query... .Select(searchTerm => { long partyCount; var foundParties = string.IsNullOrEmpty(searchTerm) ? partyRepository.GetAllAsync(out partyCount) : partyRepository.SearchByNameAndNotesAsync(searchTerm); // I assume the intention of the Buffer was to load // the data into the UI in batches. If so, you can use Buffer from nuget // package Ix-Main like this to get IEnumerable batched up // without splitting it up into unit sized pieces first return foundParties // this ToObs gets us into the monad // and returns IObservable> .ToObservable() // the ToObs here gets us into the monad from // the IEnum> returned by Buffer // and the SelectMany flattens so the output // is IObservable> .SelectMany(x => x.Buffer(500).ToObservable()) // placement of this is again important to avoid races updating the UI // erroneously putting it after the Switch is a very common bug .ObserveOn(SynchronizationContext.Current); }) // At this point we have IObservable> // Switch flattens and returns the most recent inner IObservable, // cancelling any previous pending set of batched results // superceded due to a textbox change // ie the previous inner IObservable<...> if it was incomplete // - it's the equivalent of your TakeUntil, but a bit neater .Switch() .Subscribe(searchResults => { this.parties.AddRange(searchResults); this.partyBindingSource.ResetBindings(false); }, ex => { }, () => { }); 

同步存储库版本

同步存储库接口可能是这样的:

 public interface IPartyRepository { IEnumerable GetAll(out long partyCount); IEnumerable SearchByNameAndNotes(string searchTerm); } 

就个人而言,我不建议像这样同步存储库接口。 为什么? 它通常会执行IO,因此您将浪费地阻塞线程。

你可能会说客户端可以从后台线程调用,或者你可以将他们的调用包装在一个任务中 – 但这不是我想的正确方法。

  • 客户不“知道”你要阻止; 它没有在合同中表达出来
  • 它应该是处理实现的异步方面的存储库 – 毕竟, 如何最好地实现这一点只能由存储库实现者最好地了解。

无论如何,接受上述,实现的一种方式是这样的(当然它大部分类似于异步版本,所以我只注释了差异):

 var searchStream = Observable.FromEventPattern( s => txtSearch.TextChanged += s, s => txtSearch.TextChanged -= s) .Select(evt => txtSearch.Text) .Throttle(TimeSpan.FromMilliseconds(300)) .DistinctUntilChanged() .ObserveOn(SynchronizationContext.Current) .Do(_ => { this.parties.Clear(); this.partyBindingSource.ResetBindings(false); }) .Select(searchTerm => // Here we wrap the synchronous repository into an // async call. Note it's simply not enough to call // ToObservable(Scheduler.Default) on the enumerable // because this can actually still block up to the point that the // first result is yielded. Doing as we have here, // we guarantee the UI stays responsive Observable.Start(() => { long partyCount; var foundParties = string.IsNullOrEmpty(searchTerm) ? partyRepository.GetAll(out partyCount) : partyRepository.SearchByNameAndNotes(searchTerm); return foundParties; }) // Note you can supply a scheduler, default is Scheduler.Default .SelectMany(x => x.Buffer(500).ToObservable()) .ObserveOn(SynchronizationContext.Current)) .Switch() .Subscribe(searchResults => { this.parties.AddRange(searchResults); this.partyBindingSource.ResetBindings(false); }, ex => { }, () => { });