什么是使用具有单个并发执行限制的Rx在c#中运行周期性任务的好方法?
我希望运行周期性任务,其限制是在任何给定时间最多只执行一次方法。
我正在尝试使用Rx,但我不确定一旦并发限制,最多如何强加。
var timer = Observable.Interval(TimeSpan.FromMilliseconds(100)); timer.Subscribe(tick => DoSomething());
此外,如果任务仍在运行,我希望后续计划过去。 即我不希望任务排队并导致问题。
我有2个这样的任务定期执行。 正在执行的任务当前是同步的。 但是,如果有必要,我可以让它们异步。
您应该按原样测试您的代码,因为这正是Rx已经强加的。
试试这个作为测试:
void Main() { var timer = Observable.Interval(TimeSpan.FromMilliseconds(100)); using (timer.Do(x => Console.WriteLine("!")).Subscribe(tick => DoSomething())) { Console.ReadLine(); } } private void DoSomething() { Console.Write("<"); Console.Write(DateTime.Now.ToString("HH:mm:ss.fff")); Thread.Sleep(1000); Console.WriteLine(">"); }
当你运行它时,你会得到这种输出:
! <16:54:57.111> ! <16:54:58.112> ! <16:54:59.113> ! <16:55:00.113> ! <16:55:01.114> ! <16:55:02.115> ! <16:55:03.116> ! <16:55:04.117> ! <16:55:05.118> ! <16:55:06.119
它已经确保没有重叠。
你是在正确的轨道上,你可以使用Select
+ Concat
来展平observable并限制飞行请求的数量(注意:如果你的任务需要的时间超过间隔时间,那么他们将开始叠加,因为他们不能执行得足够快):
var source = Observable.Interval(TimeSpan.FromMilliseconds(100)) //I assume you are doing async work since you want to limit concurrency .Select(_ => Observable.FromAsync(() => DoSomethingAsync())) //This is equivalent to calling Merge(1) .Concat(); source.Subscribe(/*Handle the result of each operation*/);
这是一个工厂function,完全符合您的要求。
public static IObservable Periodic(TimeSpan timeSpan) { return Observable.Return(Unit.Default).Concat(Observable.Return(Unit.Default).Delay(timeSpan).Repeat()); }
这是一个示例用法
Periodic(TimeSpan.FromSeconds(1)) .Subscribe(x => { Console.WriteLine(DateTime.Now.ToString("mm:ss:fff")); Thread.Sleep(500); });
如果你运行它,每个控制台打印将大约相隔1.5秒。
注意,如果您不希望第一个刻度线立即运行,您可以使用此工厂,直到时间跨度之后才会发送第一个单元。
public static IObservable DelayedPeriodic(TimeSpan timeSpan) { return Observable.Return(Unit.Default).Delay(timeSpan).Repeat(); }