Tag: system.reactive

Reactive Extensions订阅呼叫等待

我想基于Reactive Extensions Observable引发的每个事件执行异步调用。 我也试图让所有内容保持同步,因为我希望在处理下一个事件之前完成异步调用。 怎么会去做类似以下的事情呢? 我说类似下面的代码不编译。 settingsChangedInMemory .Subscribe(async _ => { var settings = Extract(); await SaveSettings(settings); }); 我不确定它是否会改变任何东西,但我需要订阅多个Observable。 例如,像这样的另一个订阅。 settingsChangedOnDisk .Subscribe(async _ => { var settings = await ReadSettings(settings); Apply(settings); }); 您将如何使用Reactive Extensions执行此操作?

我怎么能等待取消订阅后在Rx可观察序列中完成所有操作?

介绍 在我的WPF C#.NET应用程序中,我使用反应式扩展(Rx)来订阅事件,我经常需要从数据库重新加载一些内容来获取更新UI所需的值,因为事件对象通常只包含ID和一些元数据。 我使用Rx调度在后台加载数据并更新调度程序上的UI。 我在Rx序列中混合“Task.Run”时遇到了一些不好的经验(当使用“SelectMany”时,不再保证顺序,并且很难控制UnitTests中的调度)。 另请参阅: 在反应流水线中执行TPL代码并通过测试调度程序控制执行 我的问题 如果我关闭我的应用程序(或关闭选项卡)我想取消订阅然后等待DB调用(从Rx“Select”调用)仍然可以在“subscription.Dispose”之后运行。 到目前为止,我还没有找到任何好的实用工具或简单方法。 问题 是否有任何框架支持等待仍在Rx链中运行的所有内容 ? 如果没有,你有什么好主意如何使一个易于使用的实用程序? 是否有任何好的替代方法来实现同样的目标? 例 public async Task AwaitEverythingInARxChain() { // In real life this is a hot observable event sequence which never completes IObservable eventSource = Enumerable.Range(1, int.MaxValue).ToObservable(); IDisposable subscription = eventSource // Load data in the background .ObserveOn(Scheduler.Default) .Select(id => LoadFromDatabase(id)) // […]

使用Reactive以并行方式运行方法

晕到那里,我一直在寻找RX框架的解决方案。 我的C#4.0类将调用2种不同的方法并节省时间,我想并行执行。 有没有办法使用Reactive Framework并行运行2种不同的方法? 不仅要并行运行这两个方法,还要等待其他方法完成并合并两个结果。 示例如下图所示: AccountClass ac = new AccountClass(); string val1 = ac.Method1(); bool val2 = ac.Method2(); 如何运行这两个方法并行运行并相互等待完成并在Subscription部分中将结果组合在一起?

如何使用响应式扩展同时读取交错文件

我是反应式扩展的新手,我想使用它(在c#中)来读取包含多个交错流的文件。 基本上该文件的格式为ABCDABCDABCD… 我更喜欢按顺序读取文件并分离流(即AAA.. , BBB..等)并并行处理每个流,每个流使用不同的线程。 必须有某种forms的缓冲,以确保每个流可以尽可能地保持忙碌(当然在限制范围内)。 并非所有流都必须同时启动,在这种情况下,必须跳过延迟流的许多元素。 在这种情况下,缓冲可能会弥补差距。 文件中的元素很小(4个字节),因此非常健谈。 因此,我也在寻找一种有效处理这种方法的方法。 我开始创建一个枚举来读取文件。 这可以用于提供包含流ID的结构,或者可以基于顺序(元素数模数量的流)来分离流。 后者可能更有效率。

非重播热观察

原始问题 我有一个场景,我有多个IObservable序列,我想与Merge结合,然后听。 但是,如果其中一个产生错误,我不希望它崩溃其他流的所有内容,以及重新订阅序列(这是一个’永久’序列)。 我通过在合并之前向流添加Retry()来执行此操作,即: IEnumerable<IObservable> observables = GetObservables(); observables .Select(o => o.Retry()) .Merge() .Subscribe(/* Do subscription stuff */); 但是,当我想测试时会出现问题。 我想测试的是,如果observables一个IObservable产生一个OnError ,那么其他的仍应该能够发送它们的值并且它们应该被处理 我以为我只使用两个Subject代表两个observables IObservable ; 一个发送OnError(new Exception()) ,另一个发送OnNext(1) 。 但是,似乎Subject将重放新订阅的所有先前值(实际上是Retry() ),将测试转换为无限循环。 我尝试通过创建一个手动IObservable来解决它,该手动IObservable在第一个订阅上产生错误,之后是一个空序列,但它感觉很hacky: var i = 0; var nErrors = 2; var testErrorObservableWithOneErrorAndThenCompletion = Observable.Create(o => { i++; if (i < nErrors) { return Observable.Throw(new Exception()).Subscribe(o); } […]

使用ObserveOn时如何处理OnNext中的exception?

当我使用ObserveOn(Scheduler.ThreadPool)时,观察者在OnNext抛出错误时,我的应用程序终止。 我发现处理这个问题的唯一方法是使用下面的自定义扩展方法(除了确保OnNext永远不会抛出exception)。 然后确保每个ObserveOn后跟一个ExceptionToError 。 public static IObservable ExceptionToError(this IObservable source) { var sub = new Subject(); source.Subscribe(i => { try { sub.OnNext(i); } catch (Exception err) { sub.OnError(err); } } , e => sub.OnError(e), () => sub.OnCompleted()); return sub; } 但是,这感觉不对。 有没有更好的方法来处理这个? 例 由于未捕获的exception,此程序崩溃。 class Program { static void Main(string[] args) { try { var […]

可以跨进程或机器边界使用Reactive Extensions(Rx)吗?

依旧记得在很久以前就看到了一些关于此的讨论,但此后一直没有听到任何消息。 所以基本上你能订阅远程机器上的IObservable吗?

如何使用Reactive Extensions使用最大窗口大小来限制事件?

场景 : 我正在构建一个UI应用程序,每隔几毫秒从后端服务获取通知。 收到新通知后,我想尽快更新用户界面。 因为我可以在很短的时间内收到大量的通知,而且我总是只关心最新的事件,所以我使用了Reactive Extensions框架的Throttle()方法。 这允许我忽略紧跟新通知的通知事件,因此我的UI保持响应。 问题: 假设我将通知事件的事件流限制为50ms,并且后端每10ms发送一次通知,Thottle()方法将永远不会返回事件,因为它会一次又一次地重置其滑动窗口。 在这里,我需要一些额外的行为来指定像超时这样的东西,这样我就可以每秒至少检索一个事件,如果有这么高的事件吞吐量。 如何使用Reactive Extensions执行此操作?

RX Scheduler – 它是什么?

我正在阅读RX并完全了解调度程序的用途? 谁能解释一下?

带有TimeSpan选择器的Observable.Generate似乎泄漏内存

我正在研究使用Observable.Generate创建一系列结果,这些结果是以msdn网站上的示例为起点,每隔一段时间采样一次。 以下代码没有TimeSpan选择器不会出现内存泄漏: IObservable obs = Observable.Generate(initialState: 1, condition: x => x x + 1, resultSelector: x => x.ToString()); obs.Subscribe(x => Console.WriteLine(x)); 但是,以下代码与TimeSpan选择器显示内存泄漏: TimeSpan timeSpan = TimeSpan.FromSeconds(1); IObservable obs = Observable.Generate(initialState: 1, condition: x => x x + 1, resultSelector: x => x.ToString(), timeSelector: x => timeSpan); obs.Subscribe(x => Console.WriteLine(x)); 例如,这个玩具应用程序将使用VS 2015社区附带的Memory Profiler快速显示内存泄漏: using System; using […]