Reactive Extensions中的递归/扇出

我正在尝试拼凑一个如下工作的Rx管道:

  1. 我编写了一个函数,它接受一个I​​Observable,为我提供包含公司信息的配置文件
  2. 我查询各种数据源以查找可能相关的公司简档,所有这些都是并行的。 我将它合并到一个公司简介的IObservable中。
  3. 当我找回这些可能相关的配置文件时,我将它们与我已经观察过的配置文件进行比较,如果它们的相关性> 80%并且与我已经观察过的任何配置文件不同,我认为它们是匹配的。
  4. 我想将匹配公司反馈到第1步,以便我可以搜索这些新匹配配置文件的相关数据。

我使用一些已知的良好配置文件来引导该过程。

最终,没有更多匹配的配置文件尚未被看到,因此流程结束。

我编程时遇到了麻烦。 如果我使用Subject来允许管道的尾端将其配置文件发送到工作流的开头,那么没有人会调用OnCompleted并且我从未发现该过程已经结束。 如果我用递归来开发它,我似乎总是最终得到堆栈溢出,因为我试图用自己的返回值调用一个函数。

任何人都可以帮助我完成这项任务,我可以确定这个过程已经结束了吗?

听起来你想要一个像这样的数据流:

seed profiles --> source --> get related --> output ^ | | v -<--- transform <----- 

这似乎是解决一般问题比特定问题容易或容易的情况,因此我将提出一个通用的“反馈”函数,它应该为您提供所需的构建块:

编辑:修复function完成

 IObservable Feedback(this IObservable seed, Func> produce, Func> feed) { return Observable.Create( obs => { var ret = new CompositeDisposable(); Action partComplete = d => { ret.Remove(d); if (ret.Count == 0) obs.OnCompleted(); }; Action, Action> ssub = (o, n) => { var disp = new SingleAssignmentDisposable(); ret.Add(disp); disp.Disposable = o.Subscribe(n, obs.OnError, () => partComplete(disp)); }; Action, Action> rsub = (o, n) => { var disp = new SingleAssignmentDisposable(); ret.Add(disp); disp.Disposable = o.Subscribe(n, obs.OnError, () => partComplete(disp)); }; Action recurse = null; recurse = s => { rsub(produce(s), r => { obs.OnNext(r); ssub(feed(r), recurse); }); }; ssub(seed, recurse); return ret; }); } 

在你的情况下, TTResult似乎是相同的,所以feed将是身份函数。 produce将是用于实施步骤2和3的function。

我测试了这个函数的一些示例代码:

 void Main() { var seed = new int[] { 1, 2, 3, 4, 5, 6 }; var found = new HashSet(); var mults = seed.ToObservable() .Feedback(i => { return Observable.Range(0, 5) .Select(r => r * i) .TakeWhile(v => v < 100) .Where(v => found.Add(v)); }, i => Observable.Return(i)); using (var disp = mults.Dump()) { Console.WriteLine("Press any key to stop"); Console.ReadKey(); } Console.WriteLine("Press any key to exit"); Console.ReadKey(); } static IDisposable Dump(this IObservable source) { return source.Subscribe(item => Console.WriteLine(item), ex => Console.WriteLine("Error occurred in dump observable: " + ex.ToString()), () => Console.WriteLine("Dump completed")); }