Reactive Framework(RX)和异步处理事件

所以我只是在玩RX并学习它。 我开始玩事件,并想知道如何订阅事件,并异步处理批量结果。 请允许我用代码解释:

引发事件的简单类:

public class EventRaisingClass { public event EventHandler EventOccured; //some other code that raises event... } public class SomeEventArgs : EventArgs { public SomeEventArgs(int data) { this.SomeArg = data; } public int SomeArg { get; private set; } } 

然后我的主要:

 public static void Main(string[] args) { var eventRaiser = new EventRaisingClass(); IObservable<IEvent> observable = Observable.FromEvent(e => eventRaiser.EventOccured += e, e => eventRaiser.EventOccured -= e); IObservable<IList<IEvent>> bufferedEvents = observable.BufferWithCount(100); //how can I subscribte to bufferedEvents so that the subscription code gets called Async? bufferedEvents.Subscribe(list => /*do something with list of event args*/); //this happens synchrounously... } 

正如您在我的评论中所看到的,当您只是调用订阅时,所有订阅代码都会同步发生。 是否有一种开箱即用的方法使用RX,只要有新的事件要处理,就可以在不同的线程上调用Subscribe?

 bufferedEvents.ObserveOn(Scheduler.TaskPool).Subscribe(... 

SubscribeOn用于指定所谓的“ 订阅副作用 ”正在发生的时间表。 例如,您的observable可以在每次订阅时打开文件。

ObserveOn用于指定每当有新值时对观察者的调用将发生的调度。 实际上,它比SubscribeOn更常用。

我相信你正在寻找SubscribeOnObserveOn ,通过一个ISchedulerSystem.Concurrency下内置了几个调度程序; 其中一些使用当前的任何线程,而其他线程使用特定的线程。

此video提供了有关调度程序概念的更多信息。

Rx团队最近还发布了一个动手实验室文档,这是目前最接近教程的文档。