使用Reactive Extensions(RX),是否可以添加“暂停”命令?
我有一个接收事件流的类,并推出另一个事件流。
所有事件都使用Reactive Extensions(RX)。 使用.OnNext
将传入的事件流从外部源推送到IObserver
,并使用IObservable
和.Subscribe
推出传出的事件流。 我正在使用Subject
在幕后管理它。
我想知道RX中有什么技术暂时暂停输出。 这意味着传入事件将在内部队列中累积,当它们取消暂停时,事件将再次流出。
这是我使用Buffer和Window运算符的解决方案:
public static IObservable Pausable (this IObservable source, IObservable pauser) { var queue = source.Buffer(pauser.Where(toPause => !toPause), _ => pauser.Where(toPause => toPause)) .SelectMany(l => l.ToObservable()); return source.Window(pauser.Where(toPause => toPause).StartWith(true), _ => pauser.Where(toPause => !toPause)) .Switch() .Merge(queue); }
窗口在订阅时打开,每次从暂停流收到“真实”。 当pauser提供“false”值时它会关闭。
缓冲区执行它应该执行的操作,缓冲来自暂停的“false”和“true”之间的值。 一旦Buffer收到’true’,它就会输出IList值,这些值会立即全部流式传输。
DotNetFiddle链接: https ://dotnetfiddle.net/vGU5dJ
这是一个相当简单的Rx方式来做你想要的。 我创建了一个名为Pausable
的扩展方法,它接受一个source observable和一个暂停或恢复observable的boolean
的第二个observable。
public static IObservable Pausable ( this IObservable source, IObservable pauser) { return Observable.Create(o => { var paused = new SerialDisposable(); var subscription = Observable.Publish(source, ps => { var values = new ReplaySubject (); Func> switcher = b => { if (b) { values.Dispose(); values = new ReplaySubject (); paused.Disposable = ps.Subscribe(values); return Observable.Empty (); } else { return values.Concat(ps); } }; return pauser.StartWith(false).DistinctUntilChanged() .Select(p => switcher(p)) .Switch(); }).Subscribe(o); return new CompositeDisposable(subscription, paused); }); }
它可以像这样使用:
var xs = Observable.Generate( 0, x => x < 100, x => x + 1, x => x, x => TimeSpan.FromSeconds(0.1)); var bs = new Subject(); var pxs = xs.Pausable(bs); pxs.Subscribe(x => { /* Do stuff */ }); Thread.Sleep(500); bs.OnNext(true); Thread.Sleep(5000); bs.OnNext(false); Thread.Sleep(500); bs.OnNext(true); Thread.Sleep(5000); bs.OnNext(false);
现在,我唯一无法理解你的“传入事件流是什么意思的IObserver
”。 流是IObservable
。 观察者不是溪流。 听起来你在这里没有做点什么。 你能加入你的问题并进一步解释吗?
您可以使用Observable
模拟暂停/取消暂停。
一旦pauseObservable发出’暂停’值,缓冲事件直到pauseObservable发出’unpaused’值。
这是一个使用Dave Sexton的BufferUntil实现和Timothy Shields的Observable逻辑的例子(从我自己的问题开始)
//Input events, hot observable var source = Observable.Interval(TimeSpan.FromSeconds(1)) .Select(i => i.ToString()) .Publish().RefCount(); //Simulate pausing from Keyboard, not actually relevant within this answer var pauseObservable = Observable.FromEventPattern( k => KeyPressed += k, k => KeyPressed -= k) .Select(i => i.EventArgs.PressedKey) .Select(i => i == ConsoleKey.Spacebar) //space is pause, others are unpause .DistinctUntilChanged(); //Let events through when not paused var notPausedEvents = source.Zip(pauseObservable.MostRecent(false), (value, paused) => new {value, paused}) .Where(i => !i.paused) //not paused .Select(i => i.value) .Subscribe(Console.WriteLine); //When paused, buffer until not paused var pausedEvents = pauseObservable.Where(i => i) .Subscribe(_ => source.BufferUntil(pauseObservable.Where(i => !i)) .Select(i => String.Join(Environment.NewLine, i)) .Subscribe(Console.WriteLine));
改进的空间:可能将两个订阅源(pausedEvents和notPausedEvents)合并为一个。