当缓冲区不包含任何项目时,为什么Rx缓冲区连续执行方法?

我有一个Rx Observable充当缓冲区。 现在,当它获得10个项目时,或者在100毫秒之后,以先到者为准,它在订阅中执行该方法。

我注意到我的方法每100毫秒不断被调用,即使缓冲区中没有项目,这让我感到惊讶。 如果它没有收到缓冲区中的任何项目,那么只需让我的方法立即返回就足够了,但我觉得很奇怪它只是在背景中像这样生长。

为什么是这样? 你怎么建议我最好处​​理这个? 我是Rx的完全新手,所以也许我正在做一些奇怪的事情。 这是我的代码的简化版本:

private Subject<KeyValuePair<int, Action>> serverRequests; public MyBufferClass(IMyServer server, IScheduler scheduler) { this.serverRequests = new Subject<KeyValuePair<int, Action>>(); this.serverRequests .Buffer(TimeSpan.FromMilliseconds(100), 10, scheduler) .Subscribe(buffer => GetMultipleItemsFromServer(buffer)); } public void GetSingleItemFromServer(int id, Action callback) { this.serverRequests.OnNext(new KeyValuePair<int, Action>(id, callback)); } public void GetMultipleItemsFromServer(IEnumerable<KeyValuePair<int, Action>> idsWithCallbacks) { if (idsWithCallbacks.IsNullOrEmpty()) return; this.server.GetMultipleItems(idsWithCallbacks) } 

在我的测试中,如果我调用GetSingleItemFromServer 5次然后将我的TestScheduler推进1000毫秒,我认为GetMultipleItemsFromServer只会调用一次,但它会被调用10次。

在这种情况下,一个优雅的解决方案是在Buffer之后直接使用Where运算符来过滤掉任何空结果。 像这样的东西:

  stream .Buffer (...) .Where (x => x.Any()) .Subscribe (x => {...}, ex => {...}); 

至于为什么Buffer这样做,我想最好是展示一个空集合并让消费者选择如何处理它,而不是吞下它并否认这个机会。

另外,我不会在订阅块中调用您的服务器。 我认为将任何异步操作作为Rx流组合本身的一部分更好,并将Subscribe操作限制为处理最终结果的任何轻量级操作,即更新UI,记录成功/失败等等。像这样:

 (from request in serverRequests .Buffer (TimeSpan.FromMinutes (1)) .Where (x => x.Any()) from response in Observable.Start(server.GetMultipleItems(...)) select response) .Subscribe (x => {}, ex => {}); 

优点包括:

– 可以在服务器调用上使用更多Rx运算符,例如Timeout(),Retry(),Catch()等。

– 能够处理Subscribe()重载中的任何管道错误

– 使用SubscribeOn()/ ObserveOn()独立调度管道和Subscribe操作。

也许尝试这样:

 public MyBufferClass(IMyServer server, IScheduler scheduler) { this.serverRequests = new Subject>>(); this.serverRequests .GroupByUntil(x => 1, x => Observable.Timer(TimeSpan.FromMilliseconds(1000))) .SelectMany(x => x.ToArray()) .Subscribe(buffer => GetMultipleItemsFromServer(buffer)); } 

这不会给你空洞的结果。

关于.Buffer(...)问题的答案 – 这就是它的设计方式。 没有比这更复杂的了。