从链式任务中观察
我正在尝试创建一个Observable,其中每个项目都是通过异步任务生成的。 下一项应该通过对前一项结果的异步调用(共同递归)生成。 在“生成”这个用语中,这看起来像这样 – 除了Generate不支持异步 (它也不支持初始状态的委托)。
var ob = Observable.Generate( async () => await ProduceFirst(), // Task ProduceFirst() prev => Continue(prev) // bool Continue(T); async prev => await ProduceNext(prev) // Task ProduceNext(T) item => item );
作为一个更具体的示例,要通过一次获取100条消息来查看ServiceBus队列中的所有消息,请按如下方式实现ProduceFirst,Continue和ProduceNext:
Task<IEnumerable> ProduceFirst() { const int batchSize = 100; return _serviceBusReceiver.PeekBatchAsync(batchSize); } bool Continue(IEnumerable prev) { return prev.Any(); } async Task<IEnumerable> ProduceNext(IEnumerable prev) { const int batchSize = 100; return (await _serviceBusReceiver.PeekBatchAsync(prev.Last().SequenceNumber, batchSize + 1)).Skip(1) }
然后在IObservable<IEnumerable>
上调用.SelectMany(i => i)
将其转换为IObservable
其中_serviceBusReceiver是接口的实例,如下所示:
public interface IServiceBusReceiver { Task<IEnumerable> PeekBatchAsync(int batchSize); Task<IEnumerable> PeekBatchAsync(long fromSequenceNumber, int batchSize); }
BrokeredMessage来自https://msdn.microsoft.com/en-us/library/microsoft.servicebus.messaging.brokeredmessage.aspx
如果您打算使用自己的异步Generate
函数,我建议使用递归调度而不是包装while循环。
public static IObservable Generate ( Func> initialState, Func condition, Func> iterate, Func resultSelector, IScheduler scheduler = null) { var s = scheduler ?? Scheduler.Default; return Observable.Create (async obs => { return s.Schedule(await initialState(), async (state, self) => { if (!condition(state)) { obs.OnCompleted(); return; } obs.OnNext(resultSelector(state)); self(await iterate(state)); }); }); }
这有几个优点。 首先,你可以取消这个,用一个简单的while循环就没有办法直接取消它,实际上你甚至不会在observable完成之前返回subscribe函数。 其次,这可以让你控制每个项目的调度/异步(这使得测试变得轻而易举),这也使它更适合于库
在做了一些测试后,我认为这可以很好地使用内置的Rx运算符。
public static IObservable Generate ( Func> initialState, Func condition, Func> iterate, Func resultSelector, IScheduler scheduler = null) { return Observable.Create (o => { var current = default(TResult); return Observable .FromAsync(initialState) .Select(y => resultSelector(y)) .Do(c => current = c) .Select(x => Observable .While( () => condition(current), Observable .FromAsync(() => iterate(current)) .Select(y => resultSelector(y)) .Do(c => current = c)) .StartWith(x)) .Switch() .Where(x => condition(x)) .ObserveOn(scheduler ?? Scheduler.Default) .Subscribe(o); }); }
我用以下代码测试了这段代码:
bool Continue(IEnumerable prev) { return prev.Any(); } Task> ProduceFirst() { return Task.FromResult( EnumerableEx.Return( new BrokeredMessage() { SequenceNumber = 1 })); } Task> ProduceNext(IEnumerable prev) { return Task.FromResult( prev.Last().SequenceNumber < 3 ? EnumerableEx.Return( new BrokeredMessage() { SequenceNumber = prev.Last().SequenceNumber + 1 }) : Enumerable.Empty ()); } public class BrokeredMessage { public int SequenceNumber; }
并运行此序列:
var ob = Generate( async () => await ProduceFirst(), prev => Continue(prev), async prev => await ProduceNext(prev), item => item);
我得到了这个结果:
我的测试代码也使用了Reactive Extension团队的Interactive Extensions – NuGet“Ix-Main”。
我想这可能是正确的答案:
这不是一个好的答案。 不使用。
我是由自己的Generate
创建的,它支持初始状态+迭代函数的async / await:
public static IObservable Generate ( Func> initialState, Func condition, Func> iterate, Func resultSelector ) { return Observable.Create (async obs => { var state = await initialState(); while (condition(state)) { var result = resultSelector(state); obs.OnNext(result); state = await iterate(state); } obs.OnCompleted(); return System.Reactive.Disposables.Disposable.Empty; }); }
不幸的是,这似乎产生了副作用,即消息的产生远远超过消费。 如果观察者缓慢地处理消息,那么在我们处理其中一些消息之前,这将获取数百万条消息。 不完全是我们想要的服务总线。
我将要完成上述工作,可能会阅读更多内容,并在需要时发布更具体的问题。
我自己也有类似的问题,并同意以下评论:
我可能违反了反应范式的精神,但这是我目前所需要的 – 它不应该继续从队列中提取消息,直到它们被处理(至少在不久的将来)。
我相信IAsyncEnumerable
的IAsyncEnumerable比IObservable
更适合这种情况 – 这里的问题和任何类似的异步展开函数。 原因是每次我们迭代然后从Task
提取结果时,流控制与我们(调用者)一起拉动下一个项目或选择不满足某个条件。 这就像IAsyncEnumerable
,而不像IObservable
,它在没有我们控制速率的情况下将项目推送给我们。
Ix.NET没有合适的AsyncEnumerable.Generate
版本所以我写了以下内容来解决这个问题。
public static IAsyncEnumerable Generate (TState initialState, Func condition, Func> iterate) { return AsyncEnumerable.CreateEnumerable(() => { var started = false; var current = default(TState); return AsyncEnumerable.CreateEnumerator(async c => { if (!started) { started = true; var conditionMet = !c.IsCancellationRequested && condition(initialState); if (conditionMet) current = initialState; return conditionMet; } { var newVal = await iterate(current).ConfigureAwait(false); var conditionMet = !c.IsCancellationRequested && condition(newVal); if (conditionMet) current = newVal; return conditionMet; } }, () => current, () => { }); }); }
笔记:
- 只是非常轻微的测试。
- 是否返回初始状态。
- 不返回第一个失败条件的TState,即使它已经完成了获得该结果的工作。 可能不同的版本可能包括这个。
- 我宁愿摆脱
condition
参数,因为它是一个拉系统,它完全取决于调用者是否调用MoveNext,因此condition
似乎是多余的。 它本质上是将TakeWhile
调用添加到函数的结果中。 但是我还没有深入了解Ix.NET以了解是否需要使用MoveNext
的false
响应来dispose
IAsyncEnumerator
,因此我已将其包含在内。
如果需要特定类型, IAsyncEnumerable
当然可以转换为IObservable
。