为什么Observable.Generate()抛出System.StackOverflowException?

我正在编写一个C#(.NET 4.5)应用程序,用于聚合基于时间的事件以进行报告。 为了使我的查询逻辑可以重用于实时和历史数据,我使用了Reactive Extensions(2.0)及其IScheduler基础结构( HistoricalScheduler和friends)。

例如,假设我们创建了一个事件列表(按时间顺序排序,但它们可能重合!),其唯一的有效负载是它们的时间戳,并希望知道它们在固定持续时间的缓冲区中的分布:

 const int num = 100000; const int dist = 10; var events = new List(); var curr = DateTimeOffset.Now; var gap = new Random(); var time = new HistoricalScheduler(curr); for (int i = 0; i < num; i++) { events.Add(curr); curr += TimeSpan.FromMilliseconds(gap.Next(dist)); } var stream = Observable.Generate( 0, s => s  s + 1, s => events[s], s => events[s], time); stream.Buffer(TimeSpan.FromMilliseconds(num), time) .Subscribe(l => Console.WriteLine(time.Now + ": " + l.Count)); time.AdvanceBy(TimeSpan.FromMilliseconds(num * dist)); 

运行此代码会导致System.StackOverflowException具有以下堆栈跟踪(最后3行一直向下):

 mscorlib.dll!System.Threading.Interlocked.Exchange(ref System.IDisposable location1, System.IDisposable value) + 0x3d bytes System.Reactive.Core.dll!System.Reactive.Disposables.SingleAssignmentDisposable.Dispose() + 0x37 bytes System.Reactive.Core.dll!System.Reactive.Concurrency.ScheduledItem.Cancel() + 0x23 bytes ... System.Reactive.Core.dll!System.Reactive.Disposables.AnonymousDisposable.Dispose() + 0x4d bytes System.Reactive.Core.dll!System.Reactive.Disposables.SingleAssignmentDisposable.Dispose() + 0x4f bytes System.Reactive.Core.dll!System.Reactive.Concurrency.ScheduledItem.Cancel() + 0x23 bytes ... 

好吧,问题似乎来自我使用Observable.Generate() ,具体取决于列表大小( num )和调度程序的选择。

我究竟做错了什么? 或者更一般地说,从提供自己的时间戳的IEnumerable事件创建IObservable的首选方法是什么?

(更新 – 意识到我没有提供替代方案:请参阅答案底部)

问题在于Observable.Generate工作方式 – 它用于展开基于参数的corecursive (认为​​递归内部转换)生成器; 如果这些参数最终生成一个非常嵌套的corecursive生成器,那么你将会破坏你的堆栈。

从这一点开始, 我猜测了很多(在我面前没有Rx源) (见下文),但我愿意打赌你的定义最终扩展到如下:

 initial_state => generate_next(initial_state) => generate_next(generate_next(initial_state)) => generate_next(generate_next(generate_next(initial_state))) => generate_next(generate_next(generate_next(generate_next(initial_state)))) => ... 

并且一直持续到你的调用堆栈变得足够大以至于溢出。 比方说,一个方法签名+你的int计数器,每次递归调用就像8-16个字节(更多取决于状态机生成器的实现方式),所以60,000个声音是正确的(1M / 16~62500最大值)深度)

编辑:拉出源 – 确认:Generate的“Run”方法看起来像这样 – 记下对Generate的嵌套调用:

 protected override IDisposable Run( IObserver observer, IDisposable cancel, Action setSink) { if (this._timeSelectorA != null) { Generate.α α = new Generate.α( (Generate) this, observer, cancel); setSink(α); return α.Run(); } if (this._timeSelectorR != null) { Generate.δ δ = new Generate.δ( (Generate) this, observer, cancel); setSink(δ); return δ.Run(); } Generate._ _ = new Generate._( (Generate) this, observer, cancel); setSink(_); return _.Run(); } 

编辑:Derp,没有提供任何替代方案……这里可能有用:

(编辑:修复了Enumerable.Range ,因此流大小不会乘以chunkSize

 const int num = 160000; const int dist = 10; var events = new List(); var curr = DateTimeOffset.Now; var gap = new Random(); var time = new HistoricalScheduler(curr); for (int i = 0; i < num; i++) { events.Add(curr); curr += TimeSpan.FromMilliseconds(gap.Next(dist)); } // Size too big? Fine, we'll chunk it up! const int chunkSize = 10000; var numberOfChunks = events.Count / chunkSize; // Generate a whole mess of streams based on start/end indices var streams = from chunkIndex in Enumerable.Range(0, (int)Math.Ceiling((double)events.Count / chunkSize) - 1) let startIdx = chunkIndex * chunkSize let endIdx = Math.Min(events.Count, startIdx + chunkSize) select Observable.Generate( startIdx, s => s < endIdx, s => s + 1, s => events[s], s => events[s], time); // E pluribus streamum var stream = Observable.Concat(streams); stream.Buffer(TimeSpan.FromMilliseconds(num), time) .Subscribe(l => Console.WriteLine(time.Now + ": " + l.Count)); time.AdvanceBy(TimeSpan.FromMilliseconds(num * dist)); 

好吧,我采用了一种不同的工厂方法,不需要lamdba表达式作为状态转换,现在我再也看不到任何堆栈溢出了。 我还不确定这是否符合我的问题的正确答案,但它确实有效,我想我会在这里分享:

 var stream = Observable.Create(o => { foreach (var e in events) { time.Schedule(e, () => o.OnNext(e)); } time.Schedule(events[events.Count - 1], () => o.OnCompleted()); return Disposable.Empty; }); 

在(!)返回订阅之前手动调度事件对我来说似乎很尴尬,但在这种情况下,它可以在lambda表达式中完成。

如果这种方法有任何不妥之处,请纠正我。 另外,我仍然很高兴听到System.Reactive隐含假设我违反了原始代码。

(哦,我的,我应该先检查一下:使用RX v1.0,原来的Observable.Generate()确实看起来有效!)