使用Reactive Extensions(RX),是否可以添加“暂停”命令?

我有一个接收事件流的类,并推出另一个事件流。

所有事件都使用Reactive Extensions(RX)。 使用.OnNext将传入的事件流从外部源推送到IObserver ,并使用IObservable.Subscribe推出传出的事件流。 我正在使用Subject在幕后管理它。

我想知道RX中有什么技术暂时暂停输出。 这意味着传入事件将在内部队列中累积,当它们取消暂停时,事件将再次流出。

这是我使用Buffer和Window运算符的解决方案:

 public static IObservable Pausable(this IObservable source, IObservable pauser) { var queue = source.Buffer(pauser.Where(toPause => !toPause), _ => pauser.Where(toPause => toPause)) .SelectMany(l => l.ToObservable()); return source.Window(pauser.Where(toPause => toPause).StartWith(true), _ => pauser.Where(toPause => !toPause)) .Switch() .Merge(queue); } 

窗口在订阅时打开,每次从暂停流收到“真实”。 当pauser提供“false”值时它会关闭。

缓冲区执行它应该执行的操作,缓冲来自暂停的“false”和“true”之间的值。 一旦Buffer收到’true’,它就会输出IList值,这些值会立即全部流式传输。

DotNetFiddle链接: https ://dotnetfiddle.net/vGU5dJ

这是一个相当简单的Rx方式来做你想要的。 我创建了一个名为Pausable的扩展方法,它接受一个source observable和一个暂停或恢复observable的boolean的第二个observable。

 public static IObservable Pausable( this IObservable source, IObservable pauser) { return Observable.Create(o => { var paused = new SerialDisposable(); var subscription = Observable.Publish(source, ps => { var values = new ReplaySubject(); Func> switcher = b => { if (b) { values.Dispose(); values = new ReplaySubject(); paused.Disposable = ps.Subscribe(values); return Observable.Empty(); } else { return values.Concat(ps); } }; return pauser.StartWith(false).DistinctUntilChanged() .Select(p => switcher(p)) .Switch(); }).Subscribe(o); return new CompositeDisposable(subscription, paused); }); } 

它可以像这样使用:

 var xs = Observable.Generate( 0, x => x < 100, x => x + 1, x => x, x => TimeSpan.FromSeconds(0.1)); var bs = new Subject(); var pxs = xs.Pausable(bs); pxs.Subscribe(x => { /* Do stuff */ }); Thread.Sleep(500); bs.OnNext(true); Thread.Sleep(5000); bs.OnNext(false); Thread.Sleep(500); bs.OnNext(true); Thread.Sleep(5000); bs.OnNext(false); 

现在,我唯一无法理解你的“传入事件流是什么意思的IObserver ”。 流是IObservable 。 观察者不是溪流。 听起来你在这里没有做点什么。 你能加入你的问题并进一步解释吗?

您可以使用Observable模拟暂停/取消暂停。

一旦pauseObservable发出’暂停’值,缓冲事件直到pauseObservable发出’unpaused’值。

这是一个使用Dave Sexton的BufferUntil实现和Timothy Shields的Observable逻辑的例子(从我自己的问题开始)

  //Input events, hot observable var source = Observable.Interval(TimeSpan.FromSeconds(1)) .Select(i => i.ToString()) .Publish().RefCount(); //Simulate pausing from Keyboard, not actually relevant within this answer var pauseObservable = Observable.FromEventPattern( k => KeyPressed += k, k => KeyPressed -= k) .Select(i => i.EventArgs.PressedKey) .Select(i => i == ConsoleKey.Spacebar) //space is pause, others are unpause .DistinctUntilChanged(); //Let events through when not paused var notPausedEvents = source.Zip(pauseObservable.MostRecent(false), (value, paused) => new {value, paused}) .Where(i => !i.paused) //not paused .Select(i => i.value) .Subscribe(Console.WriteLine); //When paused, buffer until not paused var pausedEvents = pauseObservable.Where(i => i) .Subscribe(_ => source.BufferUntil(pauseObservable.Where(i => !i)) .Select(i => String.Join(Environment.NewLine, i)) .Subscribe(Console.WriteLine)); 

改进的空间:可能将两个订阅源(pausedEvents和notPausedEvents)合并为一个。