RX:如何处理序列中的n个缓冲项,然后在处理下n个项之前等待t秒?

我想弄清楚如何处理序列中的n个缓冲项,然后在处理下n个项之前等待t秒?

这是我正在尝试做的粗略forms,使用Thread.Sleep()。 我想避免Thread.Sleep()并正确地执行它。

static void Main(string[] args) { var t = Observable.Range(0, 100000); var query = t.Buffer(20); query.ObserveOn(NewThreadScheduler.Default) .Subscribe(x => DoStuff(x)); Console.WriteLine("Press ENTER to exit"); Console.ReadLine(); } static void DoStuff(IList list) { Console.WriteLine(DateTime.Now); foreach (var value in list) { Console.WriteLine(value); } Thread.Sleep(TimeSpan.FromSeconds(10)); } 

任何人都可以帮我找到更多的RX方式吗?

谢谢

 // Instantiate this once, we'll use it in a closure multiple times. var delay = Observable.Empty().Delay(TimeSpan.FromMilliseconds(10)); // start with a source of individual items to be worked. Observable.Range(0, 100000) // Create batches of work. .Buffer(20) // Select an observable for the batch of work, and concat a delay. .Select(batch => batch.ToObservable().Concat(delay)) // Concat those together and form a "process, delay, repeat" observable. .Concat() // Subscribe! .Subscribe(Console.WriteLine); // Make sure we wait for our work to be done. // There are other ways to sync up, like async / await. Console.ReadLine(); 

或者,您也可以使用async / await同步:

 static IObservable delay = Observable.Empty().Delay(TimeSpan.FromMilliseconds(100)); static async Task Run() { await Observable.Range(0, 1000) .Buffer(20) .Select(batch => batch.ToObservable().Concat(delay)) .Concat() .Do(Console.WriteLine) .LastOrDefaultAsync(); } 

这种delay是不是可以观察到一个漂亮的伎俩? 它的工作原理是因为OnCompleted像OnNext一样被延迟了!

基于克里斯托弗的答案,如果你不希望列表元素变平,你可以做到:

 var delay = Observable.Empty>().Delay(TimeSpan.FromSeconds(10)); var query = Observable.Range(0, 100000) .Buffer(20) .Select(batch => Observable.Return(batch).Concat(delay)) .Concat(); query.Subscribe(list => { Console.WriteLine(DateTime.Now); foreach (var value in list) { Console.WriteLine(value); } });