快速重复TakeWhile导致无限循环

在stream.DataAvailable为false之前,如何进行以下可观察重复? 目前它看起来永远不会停止。

在Defer部分内的AsyncReadChunk和Observable.Return使OnNext调用然后OnCompleted调用。 当Repeat接收OnNext调用时,它将它传递给TakeWhile。 当TakeWhile不满意时,它完成了observable,但我认为在OnNext之后的OnCompleted是如此之快以至于它使Repeat重新订阅了observable并导致无限循环。

我该如何纠正这种行为?

public static IObservable AsyncRead(this NetworkStream stream, int bufferSize) { return Observable.Defer(() => { try { return stream.DataAvailable ? AsyncReadChunk(stream, bufferSize) : Observable.Return(new byte[0]); } catch (Exception) { return Observable.Return(new byte[0]); } }) .Repeat() .TakeWhile((dataChunk, index) => dataChunk.Length > 0); } 


自我回答:(以下是问题作者Samet发布的答案。但是,他将答案作为问题的一部分发布。我将其转移到一个单独的答案中,标记为社区维基,因为作者没有’他自己动了一下。)


我通过重构发现调度程序存在问题。 Return使用Immediate scheduler而Repeat使用CurrentThread。 固定代码如下。

  public static IObservable AsyncRead(this NetworkStream stream, int bufferSize) { return Observable.Defer(() => { try { return stream.DataAvailable ? AsyncReadChunk(stream, bufferSize) : Observable.Return(new byte[0], Scheduler.CurrentThread); } catch (Exception) { return Observable.Return(new byte[0], Scheduler.CurrentThread); } }) .Repeat() .TakeWhile((dataChunk, index) => dataChunk.Length > 0); }