Rx如何通过键对一个复杂的对象进行分组,然后在不“停止”流的情况下执行SelectMany?

这与我的其他问题有关。 James World提出了如下解决方案:

// idStream is an IObservable of the input stream of IDs // alarmInterval is a Func that gets the interval given the ID var idAlarmStream = idStream .GroupByUntil(key => key, grp => grp.Throttle(alarmInterval(grp.Key))) .SelectMany(grp => grp.IgnoreElements().Concat(Observable.Return(grp.Key))); 

<编辑2:

问题:如何在不等待第一个事件到达的情况下立即启动计时器? 我猜这是我问题中的根本问题。 为此,我计划发送虚拟对象,其中包含我知道应该存在的ID。 但正如我在下面写的那样,我最终遇到了一些其他问题。 尽管如此,我认为解决这个问题也会很有趣。

然后转发其他有趣的部分! 现在,如果我想将一个复杂的对象分组如下,并按键分组,如下所示(不会编译)

 var idAlarmStream = idStream .Select(i => new { Id = i, IsTest = true }) .GroupByUntil(key => key.Id, grp => grp.Throttle(alarmInterval(grp.Key))) .SelectMany(grp => grp.IgnoreElements().Concat(Observable.Return(grp.Key))); 

然后我遇到了麻烦。 我无法修改有关SelectManyConcatObservable.Return的部分,以便查询可以像以前一样工作。 例如,如果我将查询作为

 var idAlarmStream = idStream .Select(i => new { Id = i, IsTest = true }) .GroupByUntil(key => key.Id, grp => grp.Throttle(alarmInterval(grp.Key))) .SelectMany(grp => grp.IgnoreElements().Concat(Observable.Return(grp.Key.First()))) .Subscribe(i => Console.WriteLine(i.Id + "-" + i.IsTest); 

然后需要两个事件才能在Subscribe观察输出。 这是第First呼叫的效果,我收集。 此外,我也想在对alarmInterval的调用中使用复杂的对象属性。

有人可以解释发生了什么,甚至是解决方案吗? 使用未经修改的解决方案的问题在于,分组不会单独查看ID值以获取键值,还会查看IsTest字段。

<edit:作为一个注释,问题可能是通过创建一个显式的类或结构然后实现一个自定义IEquatable然后然后使用James的代码来解决问题,这样分组就会由ID单独发生。 虽然感觉像黑客。

此外,如果您想要计算在闹钟响起之前看到某个项目的次数,您可以这样做,利用Select中的计数器重载。

 var idAlarmStream = idStream .Select(i => new { Id = i, IsTest = true }) .GroupByUntil(key => key.Id, grp => grp.Throttle(alarmInterval(grp.Key)) .SelectMany(grp => grp.Select((count, alarm) => new { count, alarm }).TakeLast(1)); 

注意,对于第一个(种子)项目,这将是0 – 这可能是你想要的。

您正在Select中创建匿名类型。 让我们称之为A1。 我假设你的idStream是一个IObservable。 由于这是GroupByUntil的Key,因此您无需担心密钥比较 – int相等很好。

GroupByUntil是一个IObservable>

写入的SelectMany试图成为IObservable 。 你需要在这里只有Concat(Observable.Return(grp.Key)) – 但Key的类型和Group元素的类型必须匹配,否则SelectMany将不起作用。 所以关键也必须是A1。 匿名类型使用结构相等,返回类型将是A1的流 – 但您不能将其声明为公共返回类型。

如果你只想要Id,你应该在Throttle之后添加一个.Select(x => x.Id)

 var idAlarmStream = idStream .Select(i => new { Id = i, IsTest = true }) .GroupByUntil(key => key.Id, grp => grp.Throttle(alarmInterval(grp.Key) .Select(x => x.Id)) .SelectMany(grp => grp.IgnoreElements().Concat(Observable.Return(grp.Key))); 

如果你想要A1 – 你需要创建一个实现Equality的具体类型。

编辑

我没有测试过它,但你也可以更简单地将它弄平,我认为这更容易! 它正在输出A1,所以如果你需要在某处返回流,你将不得不处理它。

 var idAlarmStream = idStream .Select(i => new { Id = i, IsTest = true }) .GroupByUntil(key => key.Id, grp => grp.Throttle(alarmInterval(grp.Key)) .SelectMany(grp => grp.TakeLast(1));