仅在满足特定条件时才会节流

我有一个观察我订阅的。 这个obsevable将返回一个对象,该对象具有一个名为ActivationType的属性,可以多次设置。

我想要实现的是每当ActivationType设置为“Type1”时记录一条消息。 但是,如果ActivationType设置为“Type2”,则只记录消息一次并等待30秒再次登录,如果ActivationType为“Type2”。

如果我有:

myObservable .Where(o => o.ActivationType == "Type1" || o.ActivationType == "Type2") //listen for types 1 and 2 .Throttle() // ??? somehow only throttle if we are currently looking at Type2 .Subscribe(Log); //log some stuff 

我相信Throttle()是我正在寻找的,但我不确定如何有条件地触发它。

有什么建议?

啊,对于几乎不可能理解的Window操作员来说,这是一个完美的案例!

编辑:我发布这个链接一个月十几次,我发誓 – 最好的阅读 – 我见过的WindowJoinBufferGroupJoin等运营商:

Lee Campbell:Rx第9部分 – 加入,窗口,缓冲和组加入

 var source = new Subject(); var feed = source.Publish().RefCount(); var ofType1 = feed.Where(t => t.ActivationType == "Type1"); var ofType2 = feed // only window the type2s .Where(t => t.ActivationType == "Type2") // our "end window selector" will be a tick 30s off from start .Window(() => Observable.Timer(TimeSpan.FromSeconds(30))) // we want the first one in each window... .Select(lst => lst.Take(1)) // moosh them all back together .Merge(); // We want all "type 1s" and the buffered outputs of "type 2s" var query = ofType1.Merge(ofType2); // Let's set up a fake stream of data var running = true; var feeder = Task.Factory.StartNew( () => { // until we say stop... while(running) { // pump new Things into the stream every 500ms source.OnNext(new Thing()); Thread.Sleep(500); } }); using(query.Subscribe(Console.WriteLine)) { // Block until we hit enter so we can see the live output // from the above subscribe Console.ReadLine(); // Shutdown our fake feeder running = false; feeder.Wait(); } 

为什么不直接使用两个流?

 var baseStream = myObservable.Publish().RefCount(); // evaluate once var type1 = baseStream.Where(o => o.ActivationType == "Type1"); var type2 = baseStream.Where(o => o.ActivationType == "Type2").Throttle(TimeSpan.FromSeconds(30)); type1.Merge(type2).Subscribe(Log);