在不重新评估序列的情况下获取IObservable中的上一个元素
在IObservable
序列中(在.NET的Reactive Extensions中),我想获取previous和current元素的值,以便我可以比较它们。 我在网上发现了一个类似于下面的例子来完成任务:
sequence.Zip(sequence.Skip(1), (prev, cur) => new { Previous = prev, Current = cur })
它工作正常,除了它评估序列两次,我想避免。 您可以看到使用此代码对其进行了两次评估:
var debugSequence = sequence.Do(item => Debug.WriteLine("Retrieved an element from sequence")); debugSequence.Zip(debugSequence.Skip(1), (prev, cur) => new { Previous = prev, Current = cur }).Subscribe();
输出显示的调试行数是序列中元素的两倍。
我理解为什么会发生这种情况,但到目前为止,我还没有找到一种不会对序列进行两次评估的替代方法。 如何只将一个序列评估与前一个和当前一起组合?
我认为有一个更好的解决方案,它使用Observable.Scan并避免双重订阅:
public static IObservable> PairWithPrevious(this IObservable source) { return source.Scan( Tuple.Create(default(TSource), default(TSource)), (acc, current) => Tuple.Create(acc.Item2, current)); }
我在我的博客上写了这篇文章: http : //www.zerobugbuild.com/?p = 213
附录
进一步修改允许您使用结果选择器更干净地使用任意类型:
public static IObservable CombineWithPrevious( this IObservable source, Func resultSelector) { return source.Scan( Tuple.Create(default(TSource), default(TSource)), (previous, current) => Tuple.Create(previous.Item2, current)) .Select(t => resultSelector(t.Item1, t.Item2)); }
评估两次是冷可观察的指标。 您可以使用.Publish()将其转换为热门:
var pub = sequence.Publish(); pub.Zip(pub.Skip(1), (... pub.Connect();
@James World附录看起来很棒,如果没有Tuple<>
,我几乎总是不喜欢:“. Item1是以前的吗?还是当前的?我记不起来了。选择器的第一个参数是什么? ,是上一个项目吗? “
对于那部分,我喜欢@dcstraw定义的专用ItemWithPrevious
。 那么你去吧,把两者放在一起(希望我之前没有和当前混在一起)和一些重命名和设施:
public static class ObservableExtensions { public static IObservable> CombineWithPrevious( this IObservable source, TSource initialValue = default(TSource)) { var seed = SortedPair.Create(initialValue, initialValue); return source.Scan(seed, (acc, current) => SortedPair.Create(current, acc.Current)); } public static IObservable CombineWithPrevious( this IObservable source, Func, TResult> resultSelector, TSource initialValue = default(TSource)) { var seed = SortedPair.Create(initialValue, initialValue); return source .Scan(seed, (acc, current) => SortedPair.Create(current, acc.Current)) .Select(p => resultSelector(p)); } } public class SortedPair { public SortedPair(T current, T previous) { Current = current; Previous = previous; } public SortedPair(T current) : this(current, default(T)) { } public SortedPair() : this(default(T), default(T)) { } public T Current; public T Previous; } public class SortedPair { public static SortedPair Create (T current, T previous) { return new SortedPair (current, previous); } public static SortedPair Create (T current) { return new SortedPair (current); } public static SortedPair Create () { return new SortedPair (); } }
如果您只需要在订阅期间访问前一个元素,这可能是最简单的方法。 (我确定有更好的方法,也许是IObservable上的缓冲区操作符?目前文档很稀疏,所以我真的不能告诉你。)
EventArgs prev = null; sequence.Subscribe(curr => { if (prev != null) { // Previous and current element available here } prev = curr; });
EventArgs只是事件参数类型的替身。
事实certificate,您可以使用变量来保存先前的值并引用它并在IObservable
扩展链中重新分配它。 这甚至可以在辅助方法中使用。 使用下面的代码,我现在可以在我的IObservable
上调用CombineWithPrevious()
来获取对前一个值的引用,而无需重新评估序列。
public class ItemWithPrevious { public T Previous; public T Current; } public static class MyExtensions { public static IObservable> CombineWithPrevious (this IObservable source) { var previous = default(T); return source .Select(t => new ItemWithPrevious { Previous = previous, Current = t }) .Do(items => previous = items.Current); } }