在不重新评估序列的情况下获取IObservable中的上一个元素

IObservable序列中(在.NET的Reactive Extensions中),我想获取previous和current元素的值,以便我可以比较它们。 我在网上发现了一个类似于下面的例子来完成任务:

 sequence.Zip(sequence.Skip(1), (prev, cur) => new { Previous = prev, Current = cur }) 

它工作正常,除了它评估序列两次,我想避免。 您可以看到使用此代码对其进行了两次评估:

 var debugSequence = sequence.Do(item => Debug.WriteLine("Retrieved an element from sequence")); debugSequence.Zip(debugSequence.Skip(1), (prev, cur) => new { Previous = prev, Current = cur }).Subscribe(); 

输出显示的调试行数是序列中元素的两倍。

我理解为什么会发生这种情况,但到目前为止,我还没有找到一种不会对序列进行两次评估的替代方法。 如何只将一个序列评估与前一个和当前一起组合?

我认为有一个更好的解决方案,它使用Observable.Scan并避免双重订阅:

 public static IObservable> PairWithPrevious(this IObservable source) { return source.Scan( Tuple.Create(default(TSource), default(TSource)), (acc, current) => Tuple.Create(acc.Item2, current)); } 

我在我的博客上写了这篇文章: http : //www.zerobugbuild.com/?p = 213

附录

进一步修改允许您使用结果选择器更干净地使用任意类型:

 public static IObservable CombineWithPrevious( this IObservable source, Func resultSelector) { return source.Scan( Tuple.Create(default(TSource), default(TSource)), (previous, current) => Tuple.Create(previous.Item2, current)) .Select(t => resultSelector(t.Item1, t.Item2)); } 

评估两次是冷可观察的指标。 您可以使用.Publish()将其转换为热门:

 var pub = sequence.Publish(); pub.Zip(pub.Skip(1), (... pub.Connect(); 

@James World附录看起来很棒,如果没有Tuple<> ,我几乎总是不喜欢:“. Item1是以前的吗?还是当前的?我记不起来了。选择器的第一个参数是什么? ,是上一个项目吗?

对于那部分,我喜欢@dcstraw定义的专用ItemWithPrevious 。 那么你去吧,把两者放在一起(希望我之前没有和当前混在一起)和一些重命名和设施:

 public static class ObservableExtensions { public static IObservable> CombineWithPrevious( this IObservable source, TSource initialValue = default(TSource)) { var seed = SortedPair.Create(initialValue, initialValue); return source.Scan(seed, (acc, current) => SortedPair.Create(current, acc.Current)); } public static IObservable CombineWithPrevious( this IObservable source, Func, TResult> resultSelector, TSource initialValue = default(TSource)) { var seed = SortedPair.Create(initialValue, initialValue); return source .Scan(seed, (acc, current) => SortedPair.Create(current, acc.Current)) .Select(p => resultSelector(p)); } } public class SortedPair { public SortedPair(T current, T previous) { Current = current; Previous = previous; } public SortedPair(T current) : this(current, default(T)) { } public SortedPair() : this(default(T), default(T)) { } public T Current; public T Previous; } public class SortedPair { public static SortedPair Create(T current, T previous) { return new SortedPair(current, previous); } public static SortedPair Create(T current) { return new SortedPair(current); } public static SortedPair Create() { return new SortedPair(); } } 

如果您只需要在订阅期间访问前一个元素,这可能是最简单的方法。 (我确定有更好的方法,也许是IObservable上的缓冲区操作符?目前文档很稀疏,所以我真的不能告诉你。)

  EventArgs prev = null; sequence.Subscribe(curr => { if (prev != null) { // Previous and current element available here } prev = curr; }); 

EventArgs只是事件参数类型的替身。

事实certificate,您可以使用变量来保存先前的值并引用它并在IObservable扩展链中重新分配它。 这甚至可以在辅助方法中使用。 使用下面的代码,我现在可以在我的IObservable上调用CombineWithPrevious()来获取对前一个值的引用,而无需重新评估序列。

 public class ItemWithPrevious { public T Previous; public T Current; } public static class MyExtensions { public static IObservable> CombineWithPrevious(this IObservable source) { var previous = default(T); return source .Select(t => new ItemWithPrevious { Previous = previous, Current = t }) .Do(items => previous = items.Current); } }