使用Rx Framework使用void AsyncMethod(Action 回调)模式进行异步调用

我已经看到很多关于如何在Rx框架中使用Observable.FromAsyncPattern()来简化异步调用的示例,但我使用的是不使用IAsyncResult的标准异步模式的接口BeginXXX / EndXXX(IAsyncResult) ,所以这对我不起作用。

我正在使用的库使用回调模式公开异步函数:

void GetAllObjects(Action<List> callback) 

在一个理想的世界里,我想转此:

 var isLoadingUsers = true; var isLoadingSystems = true; var isLoadingCustomers = true; var isLoadingRules = true; mClient.GetAllUsers(UsersCallback); mClient.GetAllCustomers(CustomersCallback); mClient.GetAllRules(RulesCallback); // set the IsLoadingXXX variables to false in callbacks // once all are false mClient.GetAllSystems(SystemsCallback); 

变成这样的东西:

 var o = Observable.ForkJoin( Observable.Start(GetAllUsers()), Observable.Start(GetAllCustomers()), Observable.Start(GetAllRules()) ).Finally(() => GetAllSystems); 

如何将该模式转变为返回IObservable的模式?

 Func> FromListCallbackPattern(Action>> function) { return () => { // We use a ReplaySubject so that if people subscribe *after* the // real method finishes, they'll still get all the items ret = new ReplaySubject(); function((list) => { // We're going to "rebroadcast" the list onto the Subject // This isn't the most Rx'iest way to do this, but it is the most // comprehensible :) foreach(var v in list) { ret.OnNext(v); } ret.OnCompleted(); }); return ret; }; } 

现在,您可以执行以下操作:

 var getAllUsers = FromListCallbackPattern(mClient.GetAllUsers); getAllUsers().Subscribe(x => /* ... */); 

试试Observable.Create() ,也许是这样的:

 public IObservable ObserveAllObjects() { return Observable.Create( observer => () => GetAllObjects(objects => objects.ForEach(o => observer.OnNext(o)))); } 

我喜欢Observable.Create,但@dahlbyk答案不正确(错过完成并在取消订阅处理程序中执行操作)。 应该是这样的:

  IObservable> FromListCallbackPattern( Action>> listGetter) { return Observable .Create>(observer => { var subscribed = true; listGetter(list => { if (!subscribed) return; observer.OnNext(list); observer.OnCompleted(); }); return () => { subscribed = false; }; }); } 

此外,由于原始API完全返回整个列表,我没有理由过早地将其转换为可观察的。 让生成的observable也返回一个列表,如果调用者需要将其展平,他可以使用.SelectMany