无功扩展定时器/间隔复位

我有一个项目,我需要每10秒发送一次状态消息,除非在此期间有更新。 意思是,每次有更新时,计时器都会重置。

var res = Observable .Interval(TimeSpan.FromSeconds(10)) .Where(_ => condition); res.Subscribe(_ => Console.WriteLine("Status sent.")); 

现在我知道“Where”只会在计时器结束时应用,所以它没有帮助。 但是,我想知道是否有办法重置间隔; 或者使用带有重复的Timer()。

使用标准Rx运算符很容易实现。

你的问题中不清楚的是“更新”到底是什么。 我将假设您有某种可观察的.OnNext(...)可以触发每次更新,或者您可以在有更新时创建一个您将调用的主题.OnNext(...) 。 如果没有可观察到的更新,很难知道何时重置计时器。

所以这是代码:

 var update = new Subject(); var res = update .Select(x => Observable.Interval(TimeSpan.FromSeconds(10.0))) .Switch(); res .Subscribe(_ => Console.WriteLine("Status sent.")); update.OnNext(true); 

res查询现在等待,直到从update获取值,然后它选择一个新的Observable.Interval 。 这意味着在Select ,类型是IObservable> ,因此.Switch()需要将其转换为IObservable.Switch()通过仅传递最近观察到的可观察值并处置任何先前的可观察量来实现此目的。 换句话说,对于每次update启动新计时器并取消先前的计时器。 这意味着如果您的更新发生频率超过10秒,那么计时器将永远不会触发。

现在,如果res observable本身就是一个更新,那么你可以这样做:

 res .Subscribe(_ => { update.OnNext(true); Console.WriteLine("Status sent."); }); 

这很好 – 它仍然可以工作,但对于每个计时器,激活res将创建一个新的计时器。 这意味着依赖于您的update observable / subject的任何内容仍然可以正常运行。

我跟这个小助手方法:

 public static IObservable CreateAutoResetInterval(IObservable resetter, TimeSpan timeSpan, bool immediate = false) { return resetter.Select(_ => immediate ? Observable.Interval(timeSpan).StartWith(0) : Observable.Interval(timeSpan)).Switch(); } 

它与Enigmativity的答案基本相同

我想你也可以在这里使用Throttle。 Throttle的目的不是让元素通过在给定的时间跨度内收到另一个元素。 因此,在您的情况下,如果在10秒内收到更新消息,则不发送状态。 参见下面的unit testing,它使用200个刻度作为节流周期。

 [TestMethod] public void Publish_Status_If_Nothing_Receieved() { //Arrange var scheduler = new TestScheduler(); var statusObserver = scheduler.CreateObserver(); var updateStream = scheduler.CreateColdObservable(OnNext(100, 1), OnNext(200, 2), OnNext(600, 3), OnNext(700, 4)); var waitTime = TimeSpan.FromTicks(200); //Act updateStream.Throttle(waitTime, scheduler) .Select(_ => Unit.Default) .Subscribe(statusObserver); //Verify no status received scheduler.AdvanceTo(100); Assert.AreEqual(0, statusObserver.Messages.Count); //Verify no status received scheduler.AdvanceTo(200); Assert.AreEqual(0, statusObserver.Messages.Count); //Assert status received scheduler.AdvanceTo(400); statusObserver.Messages.AssertEqual(OnNext(400, Unit.Default)); //Verify no more status received scheduler.AdvanceTo(700); statusObserver.Messages.AssertEqual(OnNext(400, Unit.Default)); } 

在您阅读下面的代码之前,这里是您的要求的细分:

1)如果10秒内没有状态更新则执行的function(定时器)

2)如果状态更新,定时器将被重置

3)如果函数正在执行,定时器将停止,然后在函数完成执行后重新开始。 (假设该函数比定时器的经过时间花费更多时间执行,则防止同时执行该函数两次。

4)计时器上的任何更改都必须是primefaces的(一次只能有1个线程访问它!)

  static object RequestStatusLock = new object(); ... System.Threading.Timer timer = new System.Threading.Timer(10000); //10second timer.Elapsed += timer_elapsed; //timer elapsed will be an event where you will do request status. void KickWatchDog() { if(Monitor.TryEnter(RequestStatusLock)) { //calling Stop() and Start() will reset and restart the timer. timer.Stop(); timer.Start(); Monitor.Exit(RequestStatusLock) } } void timer_elapsed(object sender,EventArgs e) { if(Monitor.TryEnter(RequestStatusLock) { //if it can get into this section means that no update from the last 10 second! timer.Stop(); RequestStatus(); timer.Start(); Monitor.Exit(RequestStatusLock); } } 

要记住的一件事是,建议在计时器耗尽时围绕您正在执行的函数使用try / catch块。 原因是,如果它出现故障,你会希望释放锁(如果你希望程序继续运行)。

注意:有两种类型的计时器, System.Threading.Timer和System.Timer (它们是完全相同的!),这就是为什么我明确地将它声明为System.Threading.Timer