在冷IObservable上暂停和恢复订阅

使用Rx ,我希望在以下代码中暂停和恢复function:

如何实现Pause()和Resume()?

static IDisposable _subscription; static void Main(string[] args) { Subscribe(); Thread.Sleep(500); // Second value should not be shown after two seconds: Pause(); Thread.Sleep(5000); // Continue and show second value and beyond now: Resume(); } static void Subscribe() { var list = new List { 1, 2, 3, 4, 5 }; var obs = list.ToObservable(); _subscription = obs.SubscribeOn(Scheduler.NewThread).Subscribe(p => { Console.WriteLine(p.ToString()); Thread.Sleep(2000); }, err => Console.WriteLine("Error"), () => Console.WriteLine("Sequence Completed") ); } static void Pause() { // Pseudocode: //_subscription.Pause(); } static void Resume() { // Pseudocode: //_subscription.Resume(); } 

Rx解决方案?

  • 我相信我可以使用某种布尔字段选通与线程锁定相结合( Monitor.WaitMonitor.Pulse

  • 但是有没有一个Rx运算符或其他一些反应速记来达到同样的目的?

它只是工作:

  class SimpleWaitPulse { static readonly object _locker = new object(); static bool _go; static void Main() { // The new thread will block new Thread (Work).Start(); // because _go==false. Console.ReadLine(); // Wait for user to hit Enter lock (_locker) // Let's now wake up the thread by { // setting _go=true and pulsing. _go = true; Monitor.Pulse (_locker); } } static void Work() { lock (_locker) while (!_go) Monitor.Wait (_locker); // Lock is released while we're waiting Console.WriteLine ("Woken!!!"); } } 

有关更多详细信息,请参阅如何使用等待和脉冲

这是一个相当简单的Rx方式来做你想要的。 我创建了一个名为Pausable的扩展方法,它接受一个source observable和一个暂停或恢复observable的boolean的第二个observable。

 public static IObservable Pausable( this IObservable source, IObservable pauser) { return Observable.Create(o => { var paused = new SerialDisposable(); var subscription = Observable.Publish(source, ps => { var values = new ReplaySubject(); Func> switcher = b => { if (b) { values.Dispose(); values = new ReplaySubject(); paused.Disposable = ps.Subscribe(values); return Observable.Empty(); } else { return values.Concat(ps); } }; return pauser.StartWith(false).DistinctUntilChanged() .Select(p => switcher(p)) .Switch(); }).Subscribe(o); return new CompositeDisposable(subscription, paused); }); } 

它可以像这样使用:

 var xs = Observable.Generate( 0, x => x < 100, x => x + 1, x => x, x => TimeSpan.FromSeconds(0.1)); var bs = new Subject(); var pxs = xs.Pausable(bs); pxs.Subscribe(x => { /* Do stuff */ }); Thread.Sleep(500); bs.OnNext(true); Thread.Sleep(5000); bs.OnNext(false); Thread.Sleep(500); bs.OnNext(true); Thread.Sleep(5000); bs.OnNext(false); 

使用PauseResume方法将它放在代码中应该相当容易。

这是作为IConnectableObservable的应用程序,我稍微纠正了较新的api(原文在这里 ):

 public static class ObservableHelper { public static IConnectableObservable WhileResumable(Func condition, IObservable source) { var buffer = new Queue(); var subscriptionsCount = 0; var isRunning = System.Reactive.Disposables.Disposable.Create(() => { lock (buffer) { subscriptionsCount--; } }); var raw = Observable.Create(subscriber => { lock (buffer) { subscriptionsCount++; if (subscriptionsCount == 1) { while (buffer.Count > 0) { subscriber.OnNext(buffer.Dequeue()); } Observable.While(() => subscriptionsCount > 0 && condition(), source) .Subscribe( v => { if (subscriptionsCount == 0) buffer.Enqueue(v); else subscriber.OnNext(v); }, e => subscriber.OnError(e), () => { if (subscriptionsCount > 0) subscriber.OnCompleted(); } ); } } return isRunning; }); return raw.Publish(); } } 

这是我的答案。 我相信暂停恢复可能存在竞争条件,但是可以通过将所有活动序列化到调度程序来减轻这种情况。 (赞成序列化而不是同步)。

 using System; using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Reactive.Linq; using System.Reactive.Subjects; using Microsoft.Reactive.Testing; using NUnit.Framework; namespace StackOverflow.Tests.Q7620182_PauseResume { [TestFixture] public class PauseAndResumeTests { [Test] public void Should_pause_and_resume() { //Arrange var scheduler = new TestScheduler(); var isRunningTrigger = new BehaviorSubject(true); Action pause = () => isRunningTrigger.OnNext(false); Action resume = () => isRunningTrigger.OnNext(true); var source = scheduler.CreateHotObservable( ReactiveTest.OnNext(0.1.Seconds(), 1), ReactiveTest.OnNext(2.0.Seconds(), 2), ReactiveTest.OnNext(4.0.Seconds(), 3), ReactiveTest.OnNext(6.0.Seconds(), 4), ReactiveTest.OnNext(8.0.Seconds(), 5)); scheduler.Schedule(TimeSpan.FromSeconds(0.5), () => { pause(); }); scheduler.Schedule(TimeSpan.FromSeconds(5.0), () => { resume(); }); //Act var sut = Observable.Create>(o => { var current = source.Replay(); var connection = new SerialDisposable(); connection.Disposable = current.Connect(); return isRunningTrigger .DistinctUntilChanged() .Select(isRunning => { if (isRunning) { //Return the current replayed values. return current; } else { //Disconnect and replace current. current = source.Replay(); connection.Disposable = current.Connect(); //yield silence until the next time we resume. return Observable.Never(); } }) .Subscribe(o); }).Switch(); var observer = scheduler.CreateObserver(); using (sut.Subscribe(observer)) { scheduler.Start(); } //Assert var expected = new[] { ReactiveTest.OnNext(0.1.Seconds(), 1), ReactiveTest.OnNext(5.0.Seconds(), 2), ReactiveTest.OnNext(5.0.Seconds(), 3), ReactiveTest.OnNext(6.0.Seconds(), 4), ReactiveTest.OnNext(8.0.Seconds(), 5) }; CollectionAssert.AreEqual(expected, observer.Messages); } } }