Tag: system.reactive

如何关联function输入和输出?

考虑下面的简单程序。 它有一个可观察的整数和一个函数来计算最近发布的整数是偶数还是奇数。 出乎意料的是,该程序在报告数字发生变化之前报告最近的数字是偶数/奇数。 static void Main(string[] args) { int version = 0; var numbers = new Subject(); IObservable isNumberEven = numbers.Select(i => i % 2 == 0); isNumberEven .Select(i => new { IsEven = i, Version = Interlocked.Increment(ref version) }) .Subscribe(i => Console.WriteLine($”Time {i.Version} : {i.IsEven}”)); numbers .Select(i => new { Number = i, Version […]

如何基于Func 将IObservable 窗口/缓冲到块中

给出一个类: class Foo { DateTime Timestamp {get; set;} } …和一个IObservable ,保证单调增加 Timestamp s,如何根据这些Timestamp s生成一个嵌入到列表中的IObservable<IList> ? 即每个IList应该有五秒钟的事件,或者其他什么。 我知道我可以使用带有TimeSpan重载的Buffer ,但我需要花时间从事件本身,而不是挂钟。 (除非有一个聪明的方法在这里提供一个IScheduler ,它使用IObservable本身作为IScheduler的来源?) 如果我尝试使用Observable.Buffer(this IObservable source, IObservable bufferBoundaries)重载如下: IObservable foos = //…; var pub = foos.Publish(); var windows = pub.Select(x => new DateTime( x.Ticks – x.Ticks % TimeSpan.FromSeconds(5).Ticks)).DistinctUntilChanged(); pub.Buffer(windows).Subscribe(x => t.Dump())); // linqpad pub.Connect(); …然后IList实例包含导致窗口关闭的项目 ,但我真的希望这个项目进入下一个窗口/缓冲区。 例如,使用时间戳[0, 1, […]

Rx Throttle(…)。ObserveOn(调度程序)和Throttle(…,调度程序)之间的区别

我有以下代码: IDisposable subscription = myObservable.Throttle(TimeSpan.FromMilliseconds(50), RxApp.MainThreadScheduler) .Subscribe(_ => UpdateUi()); 正如所料, UpdateUi()将始终在主线程上执行。 当我将代码更改为 IDisposable subscription = myObservable.Throttle(TimeSpan.FromMilliseconds(50)) .ObserveOn(RxApp.MainThreadScheduler) .Subscribe(_ => UpdateUi()); UpdateUI()将在后台线程中执行。 为什么不是Throttle(…).ObserveOn(scheduler)等同于Throttle(…, scheduler) ?

学习Rx:如何在.Window的输出上使用.Scan来获得可观察的bool值序列

我有一系列真假值,如此 var alternatingTrueFalse = Observable.Generate( true, _ => true, x => !x, x => x, _ => TimeSpan.FromMilliseconds(new Random().Next(2000))) .Take(20).Publish(); alternatingTrueFalse.Connect(); var buffered = alternatingTrueFalse .Buffer(TimeSpan.FromMilliseconds(500)) .Subscribe(x => Console.WriteLine($”{TimeStamp} {string.Join(“,”, x)}”)); 我想用500毫秒(最大)窗口/缓冲区来查看序列。 如果在一个这样的窗口中只有一个真值(没有别的),我想翻转一个开关(只需调用一个命令,现在打印到控制台)。 然后,当下一个假值到达时,我想要向后翻转开关并关闭原始序列的当前窗口/缓冲区并开始一个新的。 使用Buffer + Scan翻转开关 到目前为止,我已经想出了一种在Buffer上做到这一点的方法。 但是,缓冲区打开时间过长,始终为500毫秒。 var buffered = alternatingTrueFalse .Buffer(TimeSpan.FromMilliseconds(500)); var output = buffered .Subscribe(x => Console.WriteLine($”{TimeStamp} {string.Join(“,”, x)}”)); var isFlipped […]

仅在满足特定条件时才会节流

我有一个观察我订阅的。 这个obsevable将返回一个对象,该对象具有一个名为ActivationType的属性,可以多次设置。 我想要实现的是每当ActivationType设置为“Type1”时记录一条消息。 但是,如果ActivationType设置为“Type2”,则只记录消息一次并等待30秒再次登录,如果ActivationType为“Type2”。 如果我有: myObservable .Where(o => o.ActivationType == “Type1” || o.ActivationType == “Type2”) //listen for types 1 and 2 .Throttle() // ??? somehow only throttle if we are currently looking at Type2 .Subscribe(Log); //log some stuff 我相信Throttle()是我正在寻找的,但我不确定如何有条件地触发它。 有什么建议?

如何在完成之前从ReplaySubject 获取最新值

我需要一种方法来获取添加到符合特定条件的ReplaySubject的最新项目。 下面的示例代码完成了我需要它做的事情,但它感觉不像正确的方法: static void Main(string[] args) { var o = new ReplaySubject(); o.OnNext(“blueberry”); o.OnNext(“chimpanzee”); o.OnNext(“abacus”); o.OnNext(“banana”); o.OnNext(“apple”); o.OnNext(“cheese”); var latest = o.Where(i => i.StartsWith(“b”)) .Latest().First(); Console.WriteLine(latest); Console.WriteLine(“Press any key to exit”); Console.ReadKey(); } 输出: banana Press any key to exit 最初,我尝试使用.Where().TakeLast(1) ; 但是,我现在从上一个问题知道你必须在TakeLast()之前TakeLast()才能返回任何内容。 调用OnComplete()对我来说不是一个选项,因为我需要保持此流打开。 任何人都可以validation这是否是我想要完成的最有效的方法吗? 谢谢! 编辑 请注意,我正在使用Reactive Extensions,IEnumerable代码示例将无法正常工作。 UPDATE 我倾向于以下代码,因为我认为它是非阻塞的,除非任何人都能以不同的方式告诉我: var latest = o.Where(i […]

如何让IObservable在订阅时推送最新值

通常,当您订阅值的更改时,您也有兴趣知道初始值。 我希望我的IObservable缓存最新(或初始)值并在订阅时推送该值。 当使用普通事件时,我经常会得到类似的代码 x.SomeEvent += SomeEventHandler; SomeEventHandler(x, EventArgs.Empty); 使用IObservable我希望用推送初始值的东西包装事件。 如果我有多个订阅者,他们应该在订阅时收到最新的值我有一些代码,如果我在创建IObservable后立即订阅,但如果事件在订阅之前触发则不会: class Program { static void Main() { var s = new Source { Value = 1 }; var values = Observable.Return(s.Value).Concat( Observable.FromEvent( h => s.ValueChanged += h, h => s.ValueChanged -= h) .Select(_ => s.Value)); using (values.Subscribe(Console.WriteLine)) { s.Value = 2; // prints 1,2 as […]

Reactive Extensions超时不会停止序列?

我正在尝试创建一个IObservable ,如果在最后5秒内收到UDP消息,则返回true ,如果发生超时,则返回false。 到目前为止我有这个: public IObservable GettingUDPMessages(IPEndPoint localEP) { var udp = BaseComms.UDPBaseStringListener(localEP) .Where(msg => msg.Data.Contains(“running”)) .Select(s => true); return Observable .Timeout(udp, TimeSpan.FromSeconds(5)) .Catch(Observable.Return(false)); } 这个问题是: – 一旦返回false,序列就会停止 我只是在状态变化上真的需要true或false 。 我可以使用Subject但是当没有更多订阅者时我需要小心处理UDPBaseStringListener observable。 更新 每次我收到UDP消息时,我都希望它返回true 。 如果我在最近5秒内没有收到UDP消息,我希望它返回false 。

Reactive Extensions(Rx) – 当间隔中没有值时,具有最后已知值的样本

我有一个可观察的流,以不一致的间隔生成值,如下所示: ——1—2——3—————-4————–5— 我希望对此进行采样,但是一旦产生了一个值,就没有任何空样本: ——1—2——3—————-4————–5—– —-_—-1—-2—-3—-3—-3—-4—-4—-4—-5—-5 我显然认为Replay().RefCount()可以在这里用来向Sample()提供最后一个已知的值,但由于它没有重新订阅源流,所以它没有用完。 有关如何做到这一点的任何想法?

Rx框架:在超时时执行操作,而不会中断原始的可观察序列

给定一个可观察的源,通过轮询(更改a)低级设备的状态生成… // observable source metacode: IObservable source = Observable.Interval(TimeSpan.FromSeconds(0.5)) .Select(tick => new DeviceState(_device.ReadValue())) .DistinctUntilChanged(); ……以及更新UI的消费者…… // UI metacode: service.GetObservableDeviceStates() .Subscribe(state => viewModel.CurrentState = state.ToString()); …我需要在x秒的源“不活动”后执行自定义操作,而不会中断对源的订阅。 像这样的东西: // UI metacode: service.GetObservableDeviceStates() .DoOnTimeout(TimeSpan.FromSeconds(x), () => viewModel.CurrentState = “Idle”) .Subscribe(state => viewModel.CurrentState = state.ToString()); 什么是最佳做法? 想到的可能解决方案是(我是Rx noob): 缓冲区 (即使它不那么可读) 玩这个超时超载 ; 当没有任何变化(而不是使用DistinctUntilChanged)并在UI代码上处理它时,返回一些特殊的“服务端”: service.GetObservableDeviceStates()。Subscribe(state => viewModel.CurrentState = state.Special?“Idle”:state.ToString()); 编辑:如答案中所述 […]