从链式任务中观察

我正在尝试创建一个Observable,其中每个项目都是通过异步任务生成的。 下一项应该通过对前一项结果的异步调用(共同递归)生成。 在“生成”这个用语中,这看起来像这样 – 除了Generate不支持异步 (它也不支持初始状态的委托)。

var ob = Observable.Generate( async () => await ProduceFirst(), // Task ProduceFirst() prev => Continue(prev) // bool Continue(T); async prev => await ProduceNext(prev) // Task ProduceNext(T) item => item ); 

作为一个更具体的示例,要通过一次获取100条消息来查看ServiceBus队列中的所有消息,请按如下方式实现ProduceFirst,Continue和ProduceNext:

 Task<IEnumerable> ProduceFirst() { const int batchSize = 100; return _serviceBusReceiver.PeekBatchAsync(batchSize); } bool Continue(IEnumerable prev) { return prev.Any(); } async Task<IEnumerable> ProduceNext(IEnumerable prev) { const int batchSize = 100; return (await _serviceBusReceiver.PeekBatchAsync(prev.Last().SequenceNumber, batchSize + 1)).Skip(1) } 

然后在IObservable<IEnumerable>上调用.SelectMany(i => i)将其转换为IObservable

其中_serviceBusReceiver是接口的实例,如下所示:

 public interface IServiceBusReceiver { Task<IEnumerable> PeekBatchAsync(int batchSize); Task<IEnumerable> PeekBatchAsync(long fromSequenceNumber, int batchSize); } 

BrokeredMessage来自https://msdn.microsoft.com/en-us/library/microsoft.servicebus.messaging.brokeredmessage.aspx

如果您打算使用自己的异步Generate函数,我建议使用递归调度而不是包装while循环。

 public static IObservable Generate( Func> initialState, Func condition, Func> iterate, Func resultSelector, IScheduler scheduler = null) { var s = scheduler ?? Scheduler.Default; return Observable.Create(async obs => { return s.Schedule(await initialState(), async (state, self) => { if (!condition(state)) { obs.OnCompleted(); return; } obs.OnNext(resultSelector(state)); self(await iterate(state)); }); }); } 

这有几个优点。 首先,你可以取消这个,用一个简单的while循环就没有办法直接取消它,实际上你甚至不会在observable完成之前返回subscribe函数。 其次,这可以让你控制每个项目的调度/异步(这使得测试变得轻而易举),这也使它更适合于库

在做了一些测试后,我认为这可以很好地使用内置的Rx运算符。

 public static IObservable Generate( Func> initialState, Func condition, Func> iterate, Func resultSelector, IScheduler scheduler = null) { return Observable.Create(o => { var current = default(TResult); return Observable .FromAsync(initialState) .Select(y => resultSelector(y)) .Do(c => current = c) .Select(x => Observable .While( () => condition(current), Observable .FromAsync(() => iterate(current)) .Select(y => resultSelector(y)) .Do(c => current = c)) .StartWith(x)) .Switch() .Where(x => condition(x)) .ObserveOn(scheduler ?? Scheduler.Default) .Subscribe(o); }); } 

我用以下代码测试了这段代码:

 bool Continue(IEnumerable prev) { return prev.Any(); } Task> ProduceFirst() { return Task.FromResult( EnumerableEx.Return( new BrokeredMessage() { SequenceNumber = 1 })); } Task> ProduceNext(IEnumerable prev) { return Task.FromResult( prev.Last().SequenceNumber < 3 ? EnumerableEx.Return( new BrokeredMessage() { SequenceNumber = prev.Last().SequenceNumber + 1 }) : Enumerable.Empty()); } public class BrokeredMessage { public int SequenceNumber; } 

并运行此序列:

 var ob = Generate( async () => await ProduceFirst(), prev => Continue(prev), async prev => await ProduceNext(prev), item => item); 

我得到了这个结果:

结果

我的测试代码也使用了Reactive Extension团队的Interactive Extensions – NuGet“Ix-Main”。

我想这可能是正确的答案:

这不是一个好的答案。 不使用。

我是由自己的Generate创建的,它支持初始状态+迭代函数的async / await:

  public static IObservable Generate( Func> initialState, Func condition, Func> iterate, Func resultSelector ) { return Observable.Create(async obs => { var state = await initialState(); while (condition(state)) { var result = resultSelector(state); obs.OnNext(result); state = await iterate(state); } obs.OnCompleted(); return System.Reactive.Disposables.Disposable.Empty; }); } 

不幸的是,这似乎产生了副作用,即消息的产生远远超过消费。 如果观察者缓慢地处理消息,那么在我们处理其中一些消息之前,这将获取数百万条消息。 不完全是我们想要的服务总线。

我将要完成上述工作,可能会阅读更多内容,并在需要时发布更具体的问题。

我自己也有类似的问题,并同意以下评论:

我可能违反了反应范式的精神,但这是我目前所需要的 – 它不应该继续从队列中提取消息,直到它们被处理(至少在不久的将来)。

我相信IAsyncEnumerable的IAsyncEnumerable比IObservable更适合这种情况 – 这里的问题和任何类似的异步展开函数。 原因是每次我们迭代然后从Task提取结果时,流控制与我们(调用者)一起拉动下一个项目或选择不满足某个条件。 这就像IAsyncEnumerable ,而不像IObservable ,它在没有我们控制速率的情况下将项目推送给我们。

Ix.NET没有合适的AsyncEnumerable.Generate版本所以我写了以下内容来解决这个问题。

  public static IAsyncEnumerable Generate(TState initialState, Func condition, Func> iterate) { return AsyncEnumerable.CreateEnumerable(() => { var started = false; var current = default(TState); return AsyncEnumerable.CreateEnumerator(async c => { if (!started) { started = true; var conditionMet = !c.IsCancellationRequested && condition(initialState); if (conditionMet) current = initialState; return conditionMet; } { var newVal = await iterate(current).ConfigureAwait(false); var conditionMet = !c.IsCancellationRequested && condition(newVal); if (conditionMet) current = newVal; return conditionMet; } }, () => current, () => { }); }); } 

笔记:

  • 只是非常轻微的测试。
  • 是否返回初始状态。
  • 不返回第一个失败条件的TState,即使它已经完成了获得该结果的工作。 可能不同的版本可能包括这个。
  • 我宁愿摆脱condition参数,因为它是一个拉系统,它完全取决于调用者是否调用MoveNext,因此condition似乎是多余的。 它本质上是将TakeWhile调用添加到函数的结果中。 但是我还没有深入了解Ix.NET以了解是否需要使用MoveNextfalse响应来dispose IAsyncEnumerator ,因此我已将其包含在内。

如果需要特定类型, IAsyncEnumerable当然可以转换为IObservable