使用反应式扩展以正确的顺序处理多个响应
情况
我有一个系统,其中一个请求产生两个响应。 请求和响应具有相应的可观察量:
IObservable _requests; IObservable _mainResponses; IObservable _secondaryResponses;
保证RequestSent
事件早于MainResponseReceived
和SecondaryResponseReceived
发生,但响应按随机顺序排列。
是)我有的
最初我想要处理两个响应的处理程序,所以我压缩了observables:
_requests .SelectMany(async request => { var main = _mainResponses.FirstAsync(m => m.Id == request.Id); var secondary = _secondaryResponses.FirstAsync(s => s.Id == request.Id); var zippedResponse = main.Zip(secondary, (m, s) => new MainAndSecondaryResponseReceived { Request = request, Main = m, Secondary = s }); return await zippedResponse.FirstAsync(); ; }) .Subscribe(OnMainAndSecondaryResponseReceived);
我需要的
现在我需要处理MainResponseReceived
而不等待SecondaryResponseRecieved并且必须保证,OnMainResponseRecieved在调用OnMainAndSecondaryResponseReceived之前完成
请问如何定义这两个订阅?
测试案例1:
- 发生
RequestSent
- 发生
MainResponseReceived
– >调用OnMainResponseReceived - 发生
SecondaryResponseReceive
d – >调用OnMainAndSecondaryResponseReceived
测试案例2:
- 发生
RequestSent
- 发生
SecondaryResponseReceived
-
MainResponseReceived occurs
– >调用MainResponseReceived occurs
– >调用OnMainAndSecondaryResponseReceived
我认为你几乎走在正确的轨道上。 我会停止使用所有Async的东西 – 这只是让事情变得复杂。
试试这个查询:
var query = _requests .SelectMany(request => _mainResponses.Where(m => m.Id == request.Id).Take(1) .Do(m => OnMainResponseReceived(m)) .Zip( _secondaryResponses.Where(s => s.Id == request.Id).Take(1), (m, s) => new MainAndSecondaryResponseReceived() { Request = request, Main = m, Secondary = s })); var subscription = query.Subscribe(x => OnMainAndSecondaryResponseReceived(x));
.Do(...)
是代码中重要的缺失部分。 它确保在OnMainResponseReceived
之前调用OnMainAndSecondaryResponseReceived
无论主要或次要响应是否首先出现。
我测试了这个:
Subject _requestsSubject = new Subject (); Subject _mainResponsesSubject = new Subject (); Subject _secondaryResponsesSubject = new Subject (); IObservable _requests = _requestsSubject.AsObservable(); IObservable _mainResponses = _mainResponsesSubject.AsObservable(); IObservable _secondaryResponses = _secondaryResponsesSubject.AsObservable(); _requestsSubject.OnNext(new RequestSent() { Id = 42 }); _mainResponsesSubject.OnNext(new MainResponseReceived() { Id = 42 }); _secondaryResponsesSubject.OnNext(new SecondResponseReceived() { Id = 42 }); _requestsSubject.OnNext(new RequestSent() { Id = 99 }); _mainResponsesSubject.OnNext(new MainResponseReceived() { Id = 99 }); _secondaryResponsesSubject.OnNext(new SecondResponseReceived() { Id = 99 });