使用Reactive Extensions重新排序事件

我正在尝试重新排序在不同线程上无序的事件。

是否可以创建匹配这些大理石图的反应式扩展查询:

s1 1 2 3 4 s2 1 3 2 4 result 1 2 3 4 

和…

 s1 1 2 3 4 s2 4 3 2 1 result 1234 

即:仅以版本号顺序发布结果。

我最接近的是每次s1时刻都使用Join打开一个窗口,只有当s2到达时才使用相同的数字关闭它。

像这样:

 var publishedEvents = events.Publish().RefCount(); publishedEvents.Join( publishedEvents.Scan(0, (i, o) => i + 1), expectedVersion => publishedEvents.Any(@event => @event.Version == expectedVersion), _ => Observable.Never(), (@event, expectedVersion) => new {@event,expectedVersion}) .Where(x => x.expectedVersion == x.@event.Version) .Select(x => x.@event) .Subscribe(Persist); 

但是这不适用于图表2,第2组将在s2标记数字2时完成,因此在1之前完成。

是否有意义? 可以用Rx完成吗? 应该是?

编辑 :我想这就像重叠窗口,后面的窗口在所有前面的窗口关闭之前无法关闭。 并且在窗口编号与事件版本号匹配之前,前面的窗口不会关闭。

编辑2

我现在有这样的东西,但它不是真正的react native,function性,线程安全的LINQ启示,我希望(请忽略我的事件现在是JObjects):

 var orderedEvents = Observable.Create(observer => { var nextVersionExpected = 1; var previousEvents = new List(); return events .ObserveOn(Scheduler.CurrentThread) .Subscribe(@event => { previousEvents.Add(@event); var version = (long) @event["Version"]; if (version != nextVersionExpected) return; foreach (var previousEvent in previousEvents.OrderBy(x => (long) x["Version"]).ToList()) { if ((long) previousEvent["Version"] != nextVersionExpected) break; observer.OnNext(previousEvent); previousEvents.Remove(previousEvent); nextVersionExpected++; } }); }); 

介绍

这个问题的关键是排序。 无论如何你看它,需要某种forms的缓冲。 毫无疑问,一些精心设计的运算符组合可能会实现这一点,我认为这是一个很好的例子,其中Observable.Create是一个不错的选择。

概括解决方案

我已经做了一些努力来概括我接受任何类型的订购密钥的方法。 要做到这一点,我希望得到:

  • 用于获取事件键的键选择器函数,类型为Func
  • TKey类型的初始键
  • 一个按顺序获取下一个键的函数,类型为Func
  • 一个结果选择器,用于从源流中的配对事件生成结果,类型为Func

由于我只是使用基于1的整数序列进行测试,因此满足以下条件:

  • keySelector: i => i
  • firstKey: 1
  • nextKeyFunc: k => k+1
  • resultSelector 🙁 (left,right) => left

分类

这是我的Sort尝试。 它将事件缓冲到一个字典中,并尽快将它们刷新到订阅者:

 public static IObservable Sort (this IObservable source, Func keySelector, TKey firstKey, Func nextKeyFunc) { return Observable.Create(o => { var nextKey = firstKey; var buffer = new Dictionary(); return source.Subscribe(i => { if (keySelector(i).Equals(nextKey)) { nextKey = nextKeyFunc(nextKey); o.OnNext(i); TSource nextValue; while (buffer.TryGetValue(nextKey, out nextValue)) { buffer.Remove(nextKey); o.OnNext(nextValue); nextKey = nextKeyFunc(nextKey); } } else buffer.Add(keySelector(i), i); }); }); } 

我不得不说这是一个非常天真的实现。 在过去的生产代码中,我已经详细阐述了具体的error handling,固定大小的缓冲区和超时以防止资源泄漏。 但是,它将用于此示例。 🙂

有了这个排序(对不起!),我们现在可以看看处理多个流。

结合结果

第一次尝试

我的第一次尝试就是产生一个无序的事件流,这些事件已经被看到了所需的次数。 然后可以对其进行排序。 我这样做是通过按键对元素进行分组,使用GroupByUntil来保存每个组,直到捕获到两个元素。 然后,每个组是相同密钥的结果流。 对于整数事件的简单示例,我可以采用每个组的最后一个元素。 但是,我不喜欢这个,因为对于更真实的场景来说,这是很尴尬的,因为每个结果流可能会贡献一些有用的东西。 为了感兴趣,我包含了代码。 注意,这样可以在第二次尝试和第二次尝试之间共享测试,我接受一个未使用的resultSelector参数:

 public static IObservable OrderedCollect (this IObservable left, IObservable right, Func keySelector, TKey firstKey, Func nextKeyFunc Func resultSelector) { return left.Merge(right) .GroupByUntil(keySelector, x => x.Take(2).LastAsync()) .SelectMany(x => x.LastAsync()) .Sort(keySelector, firstKey, nextKeyFunc); } 

旁白:您可以破解SelectMany子句来决定如何选择结果。 该解决方案在第二次尝试中具有的一个优点是,在具有许多结果流的情况下,更容易看到如何扩展它以选择说,三个结果元组中的前两个到达。

第二次尝试

对于这种方法,我独立地对每个流进行排序,然后将结果Zip在一起。 这不仅是一个简单易用的操作,而且以​​有趣的方式组合每个流的结果也更加容易。 为了使测试与我的第一种方法兼容,我选择了resultSelector函数来使用第一个流的事件作为结果,但显然你可以灵活地在你的场景中做一些有用的事情:

 public static IObservable OrderedCollect (this IObservable left, IObservable right, Func keySelector, TKey firstKey, Func nextKeyFunc, Func resultSelector) { return Observable.Zip( left.Sort(keySelector, firstKey, nextKeyFunc), right.Sort(keySelector, firstKey, nextKeyFunc), resultSelector); } 

旁白:不难看出这个代码如何扩展到接受任意数量的输入流的更一般的情况,但是如前所述,使用Zip使得在给定密钥的阻塞之前它是非常不灵活的溪流在。

测试用例

最后,这是我的测试回应您的示例场景。 要运行这些,请导入nuget包rx-testingnunit ,并将上面的实现放入静态类:

 public class ReorderingEventsTests : ReactiveTest { [Test] public void ReorderingTest1() { var scheduler = new TestScheduler(); var s1 = scheduler.CreateColdObservable( OnNext(100, 1), OnNext(200, 2), OnNext(400, 3), OnNext(500, 4)); var s2 = scheduler.CreateColdObservable( OnNext(100, 1), OnNext(200, 3), OnNext(300, 2), OnNext(500, 4)); var results = scheduler.CreateObserver(); s1.OrderedCollect( right: s2, keySelector: i => i, firstKey: 1, nextKeyFunc: i => i + 1, resultSelector: (left,right) => left).Subscribe(results); scheduler.Start(); results.Messages.AssertEqual( OnNext(100, 1), OnNext(300, 2), OnNext(400, 3), OnNext(500, 4)); } [Test] public void ReorderingTest2() { var scheduler = new TestScheduler(); var s1 = scheduler.CreateColdObservable( OnNext(100, 1), OnNext(200, 2), OnNext(300, 3), OnNext(400, 4)); var s2 = scheduler.CreateColdObservable( OnNext(100, 4), OnNext(200, 3), OnNext(300, 2), OnNext(400, 1)); var results = scheduler.CreateObserver(); s1.OrderedCollect( right: s2, keySelector: i => i, firstKey: 1, nextKeyFunc: i => i + 1, resultSelector: (left, right) => left).Subscribe(results); scheduler.Start(); results.Messages.AssertEqual( OnNext(400, 1), OnNext(400, 2), OnNext(400, 3), OnNext(400, 4)); } } 

curl以避免重复

最后的评论,因为我讨厌在代码中重复自己,这是一个调整,避免了我在第二种方法中称为Sort的重复方式。 我没有把它包含在主体内,以免混淆不熟悉curry的读者:

 public static IObservable OrderedCollect (this IObservable left, IObservable right, Func keySelector, TKey firstKey, Func nextKeyFunc, Func resultSelector) { Func, IObservable> curriedSort = events => events.Sort(keySelector, firstKey, nextKeyFunc); return Observable.Zip( curriedSort(left), curriedSort(right), resultSelector); }