Tag: system.reactive corecursion

从链式任务中观察

我正在尝试创建一个Observable,其中每个项目都是通过异步任务生成的。 下一项应该通过对前一项结果的异步调用(共同递归)生成。 在“生成”这个用语中,这看起来像这样 – 除了Generate不支持异步 (它也不支持初始状态的委托)。 var ob = Observable.Generate( async () => await ProduceFirst(), // Task ProduceFirst() prev => Continue(prev) // bool Continue(T); async prev => await ProduceNext(prev) // Task ProduceNext(T) item => item ); 作为一个更具体的示例,要通过一次获取100条消息来查看ServiceBus队列中的所有消息,请按如下方式实现ProduceFirst,Continue和ProduceNext: Task<IEnumerable> ProduceFirst() { const int batchSize = 100; return _serviceBusReceiver.PeekBatchAsync(batchSize); } bool Continue(IEnumerable prev) { return prev.Any(); } […]