Reactive Extensions:订阅者中的并发

我试图围绕Reactive Extensions对并发性的支持,并且很难获得我追求的结果。 所以我可能还没有得到它

我有一个源,它将数据发送到流中的速度比订阅者可以使用它的速度快。 我更喜欢配置流,以便使用另一个线程从流中为每个新项调用订阅者,以便订阅者同时运行多个线程。 我能够确保订户的线程安全性。

以下示例演示了此问题:

Observable.Interval( TimeSpan.FromSeconds(1)) .Do( x => Console.WriteLine("{0} Thread: {1} Source value: {2}", DateTime.Now, Thread.CurrentThread.ManagedThreadId, x)) .ObserveOn(NewThreadScheduler.Default) .Subscribe(x => { Console.WriteLine("{0} Thread: {1} Observed value: {2}", DateTime.Now, Thread.CurrentThread.ManagedThreadId, x); Thread.Sleep(5000); // Simulate long work time }); 

控制台输出如下所示(删除日期):

 4:25:20 PM Thread: 6 Source value: 0 4:25:20 PM Thread: 11 Observed value: 0 4:25:21 PM Thread: 12 Source value: 1 4:25:22 PM Thread: 12 Source value: 2 4:25:23 PM Thread: 6 Source value: 3 4:25:24 PM Thread: 6 Source value: 4 4:25:25 PM Thread: 11 Observed value: 1 4:25:25 PM Thread: 12 Source value: 5 4:25:26 PM Thread: 6 Source value: 6 

请注意“观察值”时间增量。 即使源继续以比订户可以处理它更快的速度发送数据,也不会并行调用订户。 虽然我可以想象当前行为有用的一系列场景,但我需要能够在消息可用时立即处理它们。

我已经使用ObserveOn方法尝试了几种Scheduler变体,但它们似乎都没有做我想要的。

除了在Subscribe操作中分离一个线程以执行长时间运行的工作之外,还有什么我遗漏的东西可以同时向订阅者传送数据吗?

提前感谢所有的答案和建议!

这里的基本问题是你希望Rx observable以一种真正违反可观察量工作规则的方式调度事件。 我认为在这里查看Rx设计指南是有益的: http : //go.microsoft.com/fwlink/?LinkID = 205219 – 最值得注意的是,“4.2假设观察者实例以序列化方式调用”。 即你并不意味着能够并行运行OnNext调用。 事实上,Rx的排序行为对其设计理念至关重要。

如果查看源代码,您会看到Rx在ScheduledObserver类中强制执行此行为, ObserveOnObserver派生ObserveOnObserver …从内部队列调度OnNexts,每个必须在下一个之前完成调度 – 在给定的执行上下文中。 Rx不允许单个订户的OnNext调用同时执行。

这并不是说你不能让多个子纤维以不同的速率执行。 事实上,如果您按如下方式更改代码,则很容易看到:

 var source = Observable.Interval(TimeSpan.FromSeconds(1)) .Do(x => Console.WriteLine("{0} Thread: {1} Source value: {2}", DateTime.Now, Thread.CurrentThread.ManagedThreadId, x)) .ObserveOn(NewThreadScheduler.Default); var subscription1 = source.Subscribe(x => { Console.WriteLine("Subscriber 1: {0} Thread: {1} Observed value: {2}", DateTime.Now, Thread.CurrentThread.ManagedThreadId, x); Thread.Sleep(1000); // Simulate long work time }); var subscription2 = source.Subscribe(x => { Console.WriteLine("Subscriber 2: {0} Thread: {1} Observed value: {2}", DateTime.Now, Thread.CurrentThread.ManagedThreadId, x); Thread.Sleep(5000); // Simulate long work time }); 

现在,您将看到订阅服务器1超越订阅服务器2。

你不能轻易做的就是让一个观察者做一些事情,比如向一个“准备好”的用户发送一个OnNext呼叫 – 这就是你要求的迂回方式。 我还假设你不想在消费者缓慢的情况下为每个OnNext创建一个新线程!

在这种情况下,听起来你最好只使用一个订阅者,除了尽可能快地将工作推送到队列之外什么都不做,然后由许多消耗的工作线程提供服务,然后你可以根据需要进行控制步伐。