为什么这个Observable.Generate重载会导致内存泄漏?

在我的机器上大约10秒后,以下Rx.NET代码将消耗大约500 MB的内存。

var stream = Observable.Range(0, 10000) .SelectMany(i => Observable.Generate( 0, j => true, j => j + 1, j => new { N = j }, j => TimeSpan.FromMilliseconds(1))); stream.Subscribe(); 

如果我在没有Func参数的情况下使用Observable.Generate重载Func我的内存使用量平均为35 MB。

 var stream = Observable.Range(0, 10000) .SelectMany(i => Observable.Generate( 0, j => true, j => j + 1, j => new { N = j })); // j => TimeSpan.FromMilliseconds(1))); ** Removed! ** stream.Subscribe(); 

使用SelectMany()或Merge()扩展方法时似乎只是一个问题。

这是使用默认调度程序的问题。

使用TimeSpan版本,调度程序是DefaultScheduler 。 没有TimeSpan它是CurrentThreadScheduler

因此,对于基于时间的生成,它非常快速地尝试调度所有操作,并且基本上构建了等待执行的大量事件队列。 因此它使用了大量内存。

使用非基于时间的生成它使用当前线程,因此它将生成并消耗每个生成的值,从而使用非常少的内存。

哦,这不是内存泄漏。 如果您尝试以比可以消耗的速度更快的速度安排无限数量的值,那么这只是正常操作。


我反编译了代码以确定使用了哪些调度程序。

这是非基于时间的反编译:

 public static IObservable Generate(TState initialState, Func condition, Func iterate, Func resultSelector) { if (condition == null) throw new ArgumentNullException("condition"); if (iterate == null) throw new ArgumentNullException("iterate"); if (resultSelector == null) throw new ArgumentNullException("resultSelector"); return Observable.s_impl.Generate(initialState, condition, iterate, resultSelector); } public virtual IObservable Generate(TState initialState, Func condition, Func iterate, Func resultSelector) { return (IObservable)new Generate(initialState, condition, iterate, resultSelector, SchedulerDefaults.Iteration); } internal static IScheduler Iteration { get { return (IScheduler)CurrentThreadScheduler.Instance; } } 

上述方法分别来自ObservableQueryLanguageSchedulerDefaults