带有TimeSpan选择器的Observable.Generate似乎泄漏内存

我正在研究使用Observable.Generate创建一系列结果,这些结果是以msdn网站上的示例为起点,每隔一段时间采样一次。

以下代码没有TimeSpan选择器不会出现内存泄漏:

IObservable obs = Observable.Generate(initialState: 1, condition: x => x  x + 1, resultSelector: x => x.ToString()); obs.Subscribe(x => Console.WriteLine(x)); 

但是,以下代码与TimeSpan选择器显示内存泄漏:

 TimeSpan timeSpan = TimeSpan.FromSeconds(1); IObservable obs = Observable.Generate(initialState: 1, condition: x => x  x + 1, resultSelector: x => x.ToString(), timeSelector: x => timeSpan); obs.Subscribe(x => Console.WriteLine(x)); 

例如,这个玩具应用程序将使用VS 2015社区附带的Memory Profiler快速显示内存泄漏:

 using System; using System.Reactive.Linq; namespace Sample { public class Program { static void Main() { IObservable obs = Observable.Generate(1, x => x  x + 1, x => x.ToString(), x => TimeSpan.FromMilliseconds(500)); obs.Subscribe(x => { /*Do nothing but simply run the observable*/ }); Console.ReadLine(); } } } 

内存泄漏是一个不断增长的集合:

 System.Reactive.Disposables StableCompositeDisposable.Binary System.Reactive.Disposables SingleAssignmentDisposable 

我错误地使用此API吗? 我应该期待内存增长还是Reactive的错误?

对我来说这看起来像是一个错误 – 或者至少在DefaultScheduler的“递归”调度实现中出现凌乱/不良行为(它不是真正的递归,我说的是调度程序本身传递给计划操作的重载,所以你可以安排继续)。

您看到的一次性用品是通过调用DefaultScheduler.Schedule方法创建的(第71行: https : //github.com/Reactive-Extensions/Rx.NET/blob/master/Rx.NET/Source/System .Reactive.Core / Reactive / Concurrency / DefaultScheduler.cs )。

有几个原因可以解释为什么其他尝试发现这种情况失败了。 首先,一次性用品ARE最终处理 – 但只有当Generate OnCompletesOnErrors ,Generate在订阅时返回System.Reactive.AnonymousSafeObserver才会清理它。

其次,如果你使用一个短的TimeSpan (记住.NET Timer最小分辨率是15ms),那么Rx将优化使用计时器并调用QueueUserWorkItem而不使用计时器,因此不会创建这些一次性用品。

如果你深入了解Generate的实现( https://github.com/Reactive-Extensions/Rx.NET/blob/master/Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Generate.cs )你可以看到它传递了初始调用Schedule返回的IDisposable ,将它传递给观察者,观察者挂起它直到错误/完成。 这可以防止整个结果的递归调用链被收集 – 并且意味着如果您确实需要取消,或者当清理发生时,只有这样才能处理每个预定的操作的一次性调用。

您可以在下面直接使用DefaultScheduler的代码中看到相同的效果 – 在最后一行中cancel的引用足以导致泄漏。 确保使用发布版本,否则编译器将保持取消,直到方法结束为止。

 // ensure you are using a release build of this code ManualResetEvent mre = new ManualResetEvent(); IDisposable cancel; int maxCount = 20; TimeSpan timeSpan = TimeSpan.FromSeconds(1); Func recurse = null; recurse = (self, state) => { Console.WriteLine(state); if (state == maxCount) { mre.Set(); return Disposable.Empty; } return self.Schedule(state + 1, timeSpan, recurse); }; cancel = Scheduler.Default.Schedule(1, timeSpan, recurse); mre.WaitOne(); // uncomment the following line, and you'll get the same leak // leave it commented, and cancel reference is GC'd early and there's no leak // if(cancel == null) Console.WriteLine("Hang on to cancel"); 

我使用Jetbrains dotMemory API来获取内存转储以得出结论 – 我已经删除了那些API调用的代码,但如果你有这个产品,那么这里有一个完整的要点,你将能够看到最后一行清楚地说明了最后一行: https : //gist.github.com/james-world/f20377ea610fb8fc0ee811d27f7a837c或者,您可以使用MS Profiler API – 我目前没有将其分页到我的大脑工作集中!