Tag: system.reactive

在冷IObservable上暂停和恢复订阅

使用Rx ,我希望在以下代码中暂停和恢复function: 如何实现Pause()和Resume()? static IDisposable _subscription; static void Main(string[] args) { Subscribe(); Thread.Sleep(500); // Second value should not be shown after two seconds: Pause(); Thread.Sleep(5000); // Continue and show second value and beyond now: Resume(); } static void Subscribe() { var list = new List { 1, 2, 3, 4, 5 }; var obs = […]

Rx IObservable缓冲以平滑突发事件

我有一个Observable序列,可以快速突发产生事件(即:一个接一个地发生五个事件,然后是长时间延迟,然后是另一个快速突发事件等)。 我希望通过在事件之间插入一个短暂的延迟来平滑这些突发。 想象一下以下图表作为示例: Raw:–oooo ————– ooooo —– oo —————- ooo | 缓冲: – o – o – o – o ——– o – o – o – o – o – o – o ——— Ø – ○ – ○| 我目前的方法是通过Observable.Interval()生成类似节拍器的计时器,该计时器表示何时可以从原始流中拉出另一个事件。 问题是我无法弄清楚如何将该计时器与我的原始无缓冲可观察序列相结合。 IObservable.Zip()接近于我想做的事情,但它只有在原始流比定时器生成事件更快的情况下才有效。 一旦原始流中存在显着的间歇,计时器就会建立一系列不需要的事件,然后立即与原始流中的下一个事件突发事件配对。 理想情况下,我想要一个具有以下函数签名的IObservable扩展方法,该方法生成我上面概述的bevaior。 现在,来救我的StackOverflow 🙂 public static IObservable Buffered(this IObservable src, TimeSpan minDelay) […]

一种以均匀间隔推送缓冲事件的方法

我想要实现的是缓冲来自某些IObservable的传入事件(它们以突发forms进行)并进一步释放它们,但是以偶数间隔逐个释放它们。 像这样: -oo-ooo-oo——————oooo-oo-o————–> -o–o–o–o–o–o–o——–o–o–o–o–o–o–o—-> 因为我对Rx很新,所以我不确定是否已经有一个主题或运算符。 也许它可以通过构图完成? 更新: 感谢Richard Szalay指出Drain运算符,我找到了James Miles的Drain运算符使用的另一个例子 。 以下是我设法让它在WPF应用程序中工作的方式: .Drain(x => { Process(x); return Observable.Return(new Unit()) .Delay(TimeSpan.FromSeconds(1), Scheduler.Dispatcher ); }).Subscribe(); 我有一些乐趣,因为省略scheduler参数会导致应用程序在调试模式下崩溃而不会出现任何exception(我需要学习如何处理Rx中的exception)。 Process方法直接修改了UI状态,但我想从中生成一个IObservable非常简单(使用ISubject?)。 更新: 与此同时,我一直在试验ISubject,下面的课程做了我想要的 – 它及时让出缓冲的Ts: public class StepSubject : ISubject { IObserver subscriber; Queue queue = new Queue(); MutableDisposable cancel = new MutableDisposable(); TimeSpan interval; IScheduler scheduler; bool idle = true; […]

Rx:我如何立即响应,并限制后续请求

我想建立一个可以立即响应事件的Rx订阅,然后忽略在指定的“冷却”时段内发生的后续事件。 开箱即用的Throttle / Buffer方法只在超时过后响应,这不是我需要的。 下面是一些设置场景的代码,并使用Throttle(这不是我想要的解决方案): class Program { static Stopwatch sw = new Stopwatch(); static void Main(string[] args) { var subject = new Subject(); var timeout = TimeSpan.FromMilliseconds(500); subject .Throttle(timeout) .Subscribe(DoStuff); var factory = new TaskFactory(); sw.Start(); factory.StartNew(() => { Console.WriteLine(“Batch 1 (no delay)”); subject.OnNext(1); }); factory.StartNewDelayed(1000, () => { Console.WriteLine(“Batch 2 (1s delay)”); subject.OnNext(2); […]

如何使用RX限制事件流?

我想有效地限制事件流,以便在收到第一个事件时调用我的委托,但如果接收到后续事件则不会持续1秒。 在超时(1秒)到期后,如果收到后续事件,我希望调用我的代理。 使用Reactive Extensions有一种简单的方法吗? 示例代码: static void Main(string[] args) { Console.WriteLine(“Running…”); var generator = Observable .GenerateWithTime(1, x => x x, x => TimeSpan.FromMilliseconds(1), x => x + 1) .Timestamp(); var builder = new StringBuilder(); generator .Sample(TimeSpan.FromSeconds(1)) .Finally(() => Console.WriteLine(builder.ToString())) .Subscribe(feed => builder.AppendLine(string.Format(“Observed {0:000}, generated at {1}, observed at {2}”, feed.Value, feed.Timestamp.ToString(“mm:ss.fff”), DateTime.Now.ToString(“mm:ss.fff”)))); Console.ReadKey(); } 当前输出: […]