react native管道 ​​- 如何控制并行性?

我正在构建一个简单的处理管道,其中一个项目被作为输入提取,它由多个处理器以顺序方式操作,最后输出。 下图描述了整体架构:

RX-管

它当前的工作方式 :Pipeline尽可能快地从提供者那里获取项目。 一旦获取了一个项目,它就会被传递给处理器。 处理完项目后,将通知输出。 虽然以顺序方式处理单个项目,但是可以并行处理多个项目(取决于从提供者获取它们的速度)。

从管道创建并返回的IObservable如下所示:

 return Observable.Create(async observer => { while (_provider.HasNext) { T item = await _provider.GetNextAsync(); observer.OnNext(item); } }).SelectMany(item => Observable.FromAsync(() => _processors.Aggregate( seed: Task.FromResult(item), func: (current, processor) => current.ContinueWith( // Append continuations. previous => processor.ProcessAsync(previous.Result)) .Unwrap()))); // We need to unwrap Task{T} from Task{Task{T}}; 

缺少的部分 :我需要一个控制机制来控制在任何给定时间管道中可以有多少项(最大)

例如,如果max并行处理为3 ,那么将导致以下工作流程:

  1. 获取项目1并将其传递给处理器。
  2. 获取项目2并将其传递给处理器。
  3. 获取项目3并将其传递给处理器。
  4. 第1项已完成处理。
  5. 获取项目4并将其传递给处理器。
  6. 第3项完成处理。
  7. 获取项目5并将其传递给处理器。
  8. 等等…

Merge提供了一个需要最大并发性的重载。

它的签名如下: IObservable Merge(this IObservable> source, int maxConcurrency);

这是你的例子的样子(我重构了一些其他代码,你可以采取或离开):

 return Observable //Reactive while loop also takes care of the onComplete for you .While(() => _provider.HasNext, Observable.FromAsync(_provider.GetNextAsync)) //Makes return items that will only execute after subscription .Select(item => Observable.Defer(() => { return _processers.Aggregate( seed: Observable.Return(item), func: (current, processor) => current.SelectMany(processor.ProcessAsync)); })) //Only allow 3 streams to be execute in parallel. .Merge(3); 

要打破这一点,

  1. While将检查每次迭代,如果_provider.HasNext为true,如果是,那么它将重新订阅以获取_provider的下一个值,否则它将发出onCompleted
  2. 在select中创建了一个新的可观察流,但尚未使用Defer进行评估
  3. 返回的IObservable>将传递给MergeMerge同时订阅最多3个可观察对象。
  4. 内部observable最终评估订阅的时间。

备选方案1

如果您还需要控制并行请求的数量,则需要更加棘手,因为您需要发出信号表明您的Observable已准备好接受新值:

 return Observable.Create(observer => { var subject = new Subject(); var disposable = new CompositeDisposable(subject); disposable.Add(subject //This will complete when provider has run out of values .TakeWhile(_ => _provider.HasNext) .SelectMany( _ => _provider.GetNextAsync(), (_, item) => { return _processors .Aggregate( seed: Observable.Return(item), func: (current, processor) => current.SelectMany(processor.ProcessAsync)) //Could also use `Finally` here, this signals the chain //to start on the next item. .Do(dontCare => {}, () => subject.OnNext(Unit.Default)); } ) .Merge(3) .Subscribe(observer)); //Queue up 3 requests for the initial kickoff disposable.Add(Observable.Repeat(Unit.Default, 3).Subscribe(subject.OnNext)); return disposable; }); 

您可能需要重新排列您发布的代码,但这是一种方法:

 var eventLoopScheduler = new EventLoopScheduler (); (from semaphore in Observable.Return(new Semaphore(2,2)) from input in GetInputObs() from getAccess in Observable.Start(() => semaphore.WaitOne(),eventLoopScheduler) from output in ProcessInputOnPipeline(input) .SubscribeOn(Scheduler.Default) .Finally(() => semaphore.Release()) select output) .Subscribe(x => Console.WriteLine(x), ex => {}); 

我已将您的管道建模为1 Observable(实际上它将由链接在一起的几个较小的可观察对象组成)

关键是要确保无论管道如何终止(空/错误)都会释放信号量,否则流可能会挂起,因此使用Finally()调用信号量上的Release()。 (如果它永远不会OnComplete()/ OnError(),那么可能值得考虑在管道observable上添加一个Timeout。

编辑:

根据下面的评论,我添加了一些关于信号量访问的调度,这样我们就不会阻止将这些输入推送到流中的人。 我使用了EventLoopScheduler,以便所有对信号量访问的请求都会排队并在1个线程上执行。

编辑:我确实更喜欢保罗的答案 – 简单,减少调度,减少同步(合并在内部使用队列)。