如何连接多个IObservable序列?

var a = Observable.Range(0, 10); var b = Observable.Range(5, 10); var zip = a.Zip(b, (x, y) => x + "-" + y); zip.Subscribe(Console.WriteLine); 

打印
0 – 5
1 – 6
2 – 7

相反,我想加入相同的价值观
5 – 5
6 – 6
7 – 7
8 – 8

这是合并100个有序异步序列的问题的简化示例。 加入两个IEnumerable非常容易,但我找不到在Rx中做这样的事情的方法。 有任何想法吗?

更多关于投入和我想要实现的目标。 基本上,整个系统是一个实时管道,具有通过fork-join模式连接的多个状态机(聚合器,缓冲区,平滑filter等)。 RX是否适合实现这些东西? 每个输入都可以表示为

 public struct DataPoint { public double Value; public DateTimeOffset Timestamp; } 

每个输入的数据位在到达时被加上时间戳,因此所有事件通过它们的连接键(时间戳)自然地排序。 当事件通过管道传播时,它们会分叉并加入。 联接需要通过时间戳关联并按预定义顺序应用。 例如,join(a,b,c,d)=> join(join(join(a,b),c),d)。

编辑以下是我能赶紧提出的内容。 希望有一个基于现有Rx运算符的更简单的解决方案。

 static void Test() { var a = Observable.Range(0, 10); var b = Observable.Range(5, 10); //var zip = a.Zip(b, (x, y) => x + "-" + y); //zip.Subscribe(Console.WriteLine); var joined = MergeJoin(a,b, (x,y) => x + "-" + y); joined.Subscribe(Console.WriteLine); } static IObservable MergeJoin(IObservable left, IObservable right, Func selector) { return Observable.CreateWithDisposable(o => { Queue a = new Queue(); Queue b = new Queue(); object gate = new object(); left.Subscribe(x => { lock (gate) { if (a.Count == 0 || a.Peek() < x) a.Enqueue(x); while (a.Count != 0 && b.Count != 0) { if (a.Peek() == b.Peek()) { o.OnNext(selector(a.Dequeue(), b.Dequeue())); } else if (a.Peek()  { lock (gate) { if (b.Count == 0 || b.Peek() < x) b.Enqueue(x); while (a.Count != 0 && b.Count != 0) { if (a.Peek() == b.Peek()) { o.OnNext(selector(a.Dequeue(), b.Dequeue())); } else if (a.Peek() < b.Peek()) { a.Dequeue(); } else { b.Dequeue(); } } } }); return Disposable.Empty; }); 

老实说,我不能想到基于现有运算符的解决方案,这些运算符适用于未知顺序的热源(即xs before ys vs xs before ys )。 你的解决方案似乎很好(嘿,如果它有效),但如果它是我的代码,我会做一些改变:

  • 使用MutableDisposableCompositeDisposable正确支持取消
  • 对从选择器抛出的exception调用OnError (使其与其他运算符更加一致)
  • 如果一个源可以在另一个源之前完成,请考虑支持完成

下面的代码已经过双范围输入测试,翻转了相同的输入,以及Empty + Never

 public static IObservable MergeJoin( IObservable left, IObservable right, Func selector) { return Observable.CreateWithDisposable(o => { Queue a = new Queue(); Queue b = new Queue(); object gate = new object(); bool leftComplete = false; bool rightComplete = false; MutableDisposable leftSubscription = new MutableDisposable(); MutableDisposable rightSubscription = new MutableDisposable(); Action tryDequeue = () => { lock (gate) { while (a.Count != 0 && b.Count != 0) { if (a.Peek() == b.Peek()) { string value = null; try { value = selector(a.Dequeue(), b.Dequeue()); } catch (Exception ex) { o.OnError(ex); return; } o.OnNext(value); } else if (a.Peek() < b.Peek()) { a.Dequeue(); } else { b.Dequeue(); } } } }; leftSubscription.Disposable = left.Subscribe(x => { lock (gate) { if (a.Count == 0 || a.Peek() < x) a.Enqueue(x); tryDequeue(); if (rightComplete && b.Count == 0) { o.OnCompleted(); } } }, () => { leftComplete = true; if (a.Count == 0 || rightComplete) { o.OnCompleted(); } }); rightSubscription.Disposable = right.Subscribe(x => { lock (gate) { if (b.Count == 0 || b.Peek() < x) b.Enqueue(x); tryDequeue(); if (rightComplete && b.Count == 0) { o.OnCompleted(); } } }, () => { rightComplete = true; if (b.Count == 0 || leftComplete) { o.OnCompleted(); } }); return new CompositeDisposable(leftSubscription, rightSubscription); }); } 

GroupBy可能会做你需要的。 似乎你没有时间限制什么时候项目“加入”,你只需要以某种方式将类似的项目放在一起。

 Observable.Merge(Observable.Range(1, 10), Observable.Range(5, 15)) .GroupBy(k => k) .Subscribe( go => go.Count().Where(cnt => cnt > 1) .Subscribe(cnt => Console.WriteLine("Key {0} has {1} matches", go.Key, cnt))); 

关于上述两点需要注意的事项,Merge有以下重载,因此您的req有数百个连接的流不会出现问题:

 Merge(params IObservable[] sources); Merge(this IEnumerable> sources); Merge(this IObservable> source); 

此外, GroupBy返回IObservable> ,这意味着您可以对每个组以及每个组中的每个新成员作出反应 – 无需等到所有组完成。

这个答案是从Rx论坛复制的,只是为了在这里存档:

 var xs = Observable.Range(1, 10); var ys = Observable.Range(5, 10); var joined = from x in xs from y in ys where x == y select x + "-" + y; 

或者不使用查询表达式:

 var joined = xs.SelectMany(x => ys, (x, y) => new {x, y}) .Where(t => tx == ty) .Select(t => tx + "-" + ty); 

如何在v.2838中使用新的Join运算符。

 var a = Observable.Range(1, 10); var b = Observable.Range(5, 10); var joinedStream = a.Join(b, _ => Observable.Never(), _ => Observable.Never(), (aOutput, bOutput) => new Tuple(aOutput, bOutput)) .Where(tupple => tupple.Item1 == tupple.Item2); joinedStream.Subscribe(output => Trace.WriteLine(output)); 

这是我第一次看到Join ,我不确定使用像这样的Never运算符是否明智。 当处理大量投入时,由于它产生了大量的操作,所以更多的投入被撤销。 我认为可以做的工作是在制作matche时关闭窗口并使解决方案更有效率。 这就是说上面的例子按你的问题工作。

为了记录,我认为斯科特的答案可能是这种情况下的方法。 我只是把它作为一种潜在的选择。