什么是使用具有单个并发执行限制的Rx在c#中运行周期性任务的好方法?

我希望运行周期性任务,其限制是在任何给定时间最多只执行一次方法。

我正在尝试使用Rx,但我不确定一旦并发限制,最多如何强加。

var timer = Observable.Interval(TimeSpan.FromMilliseconds(100)); timer.Subscribe(tick => DoSomething()); 

此外,如果任务仍在运行,我希望后续计划过去。 即我不希望任务排队并导致问题。

我有2个这样的任务定期执行。 正在执行的任务当前是同步的。 但是,如果有必要,我可以让它们异步。

您应该按原样测试您的代码,因为这正是Rx已经强加的。

试试这个作为测试:

 void Main() { var timer = Observable.Interval(TimeSpan.FromMilliseconds(100)); using (timer.Do(x => Console.WriteLine("!")).Subscribe(tick => DoSomething())) { Console.ReadLine(); } } private void DoSomething() { Console.Write("<"); Console.Write(DateTime.Now.ToString("HH:mm:ss.fff")); Thread.Sleep(1000); Console.WriteLine(">"); } 

当你运行它时,你会得到这种输出:

 ! <16:54:57.111> ! <16:54:58.112> ! <16:54:59.113> ! <16:55:00.113> ! <16:55:01.114> ! <16:55:02.115> ! <16:55:03.116> ! <16:55:04.117> ! <16:55:05.118> ! <16:55:06.119 

它已经确保没有重叠。

你是在正确的轨道上,你可以使用Select + Concat来展平observable并限制飞行请求的数量(注意:如果你的任务需要的时间超过间隔时间,那么他们将开始叠加,因为他们不能执行得足够快):

 var source = Observable.Interval(TimeSpan.FromMilliseconds(100)) //I assume you are doing async work since you want to limit concurrency .Select(_ => Observable.FromAsync(() => DoSomethingAsync())) //This is equivalent to calling Merge(1) .Concat(); source.Subscribe(/*Handle the result of each operation*/); 

这是一个工厂function,完全符合您的要求。

 public static IObservable Periodic(TimeSpan timeSpan) { return Observable.Return(Unit.Default).Concat(Observable.Return(Unit.Default).Delay(timeSpan).Repeat()); } 

这是一个示例用法

 Periodic(TimeSpan.FromSeconds(1)) .Subscribe(x => { Console.WriteLine(DateTime.Now.ToString("mm:ss:fff")); Thread.Sleep(500); }); 

如果你运行它,每个控制台打印将大约相隔1.5秒。

注意,如果您不希望第一个刻度线立即运行,您可以使用此工厂,直到时间跨度之后才会发送第一个单元。

 public static IObservable DelayedPeriodic(TimeSpan timeSpan) { return Observable.Return(Unit.Default).Delay(timeSpan).Repeat(); }