RX Observable.TakeWhile在每个元素之前检查条件,但我需要在之后执行检查

Observable.TakeWhile允许您在条件为真时运行序列(使用委托,以便我们可以对实际的序列对象执行计算),但是它会在每个元素之前检查这个条件。 如何在每个元素之后执行相同的检查?

以下代码演示了此问题

void RunIt() { List listOfCommands = new List(); listOfCommands.Add(new SomeCommand { CurrentIndex = 1, TotalCount = 3 }); listOfCommands.Add(new SomeCommand { CurrentIndex = 2, TotalCount = 3 }); listOfCommands.Add(new SomeCommand { CurrentIndex = 3, TotalCount = 3 }); var obs = listOfCommands.ToObservable().TakeWhile(c => c.CurrentIndex != c.TotalCount); obs.Subscribe(x => { Debug.WriteLine("{0} of {1}", x.CurrentIndex, x.TotalCount); }); } class SomeCommand { public int CurrentIndex; public int TotalCount; } 

这输出

 1 of 3 2 of 3 

我无法得到第三个元素

看看这个例子,你可能认为我所要做的就是改变我的状况 –

 var obs = listOfCommands.ToObservable().TakeWhile(c => c.CurrentIndex <= c.TotalCount); 

但是,observable永远不会完成(因为在我的真实世界代码中,流不会在这三个命令之后结束)

没有内置的运算符来执行您所要求的操作,但是这里使用Publish运行两个查询,而只订阅底层的observable一次:

 // Emits matching values, but includes the value that failed the filter public static IObservable TakeWhileInclusive( this IObservable source, Func predicate) { return source.Publish(co => co.TakeWhile(predicate) .Merge(co.SkipWhile(predicate).Take(1))); } 

然后:

 var obs = listOfCommands.ToObservable() .TakeWhileInclusive(c.CurrentIndex != c.TotalCount); 

最终编辑:

我根据谢尔盖的TakeWhileInclusive实现在此线程中建立了我的解决方案 – 如何根据事件中的条件完成Rx Observable

 public static IObservable TakeUntil( this IObservable source, Func predicate) { return Observable .Create(o => source.Subscribe(x => { o.OnNext(x); if (predicate(x)) o.OnCompleted(); }, o.OnError, o.OnCompleted )); } 

您可以使用TakeUntil运算符获取每个项目,直到辅助源生成值; 在这种情况下,我们可以将第二个流指定为谓词通过后的第一个值:

 public static IObservable TakeWhileInclusive( this IObservable source, Func predicate) { return source.TakeUntil(source.SkipWhile(x => predicate(x)).Skip(1)); } 

我想你是在TakeWhile之后,而不是TakeUntil

 var list = (new List(){1,2,3,4,5,6,7,8,9,10}); var takeWhile = list .ToObservable() .Select((_, i) => Tuple.Create(i, _)) .TakeWhile(tup => tup.Item1 < list.Count) .Do(_ => Console.WriteLine("Outputting {0}", _.Item2)); 

好吧,你想要的东西不是开箱即用的,至少我不知道具有特定语法的东西。 也就是说,你可以很容易地将它拼凑在一起(而且它不是讨厌):

 var fakeCmds = Enumerable .Range(1, 100) .Select(i => new SomeCommand() {CurrentIndex = i, TotalCount = 10}) .ToObservable(); var beforeMatch = fakeCmds .TakeWhile(c => c.CurrentIndex != c.TotalCount); var theMatch = fakeCmds .SkipWhile(c => c.CurrentIndex != c.TotalCount) .TakeWhile(c => c.CurrentIndex == c.TotalCount); var upToAndIncluding = Observable.Concat(beforeMatch, theMatch); 

组合,使用新的SkipUntilTakeUntil

SkipUntil return source.Publish(s => s.SkipUntil(s.Where(predicate)));

TakeUntil (包含) return source.Publish(s => s.TakeUntil(s.SkipUntil(predicate)));

完整来源 : https : //gist.github.com/GeorgeTsiokos/a4985b812c4048c428a981468a965a86