Tag: system.reactive

为什么从给定订阅者抛出时从未调用OnError回调?

请观察以下unit testing: using System; using System.Reactive.Linq; using System.Threading; using System.Threading.Tasks; using Microsoft.VisualStudio.TestTools.UnitTesting; namespace UnitTests { [TestClass] public class TestRx { public const int UNIT_TEST_TIMEOUT = 5000; private static IObservable GetObservable(int count = 100, int msWait = 10) { return Observable.Create(async (obs, cancellationToken) => { for (int i = 0; i { Thread.Sleep(msWait); return value; })); […]

可以从时间表列表中观察到

如果我有一个包含时间戳的对象列表 – 如何在时间戳之间的相对时间生成一个可观察到的事件? 例如。 如果我有三个带有时间戳的对象2014-01-01 10:30,2014-01-01 10:45和2014-01-01 11:30,我希望第一个事件立刻开始,接下来的15分钟后,最后一个45分钟后。 我怎样才能加快这个过程,1分钟等于1秒? 因此,第一个事件将像以前一样立即开始,但下一个事件将在15秒后而不是15分钟之后开始,依此类推。

如何正确观察非标准事件?

我是Reactive Extensions的新手,并且处理具有如下定义的事件的COM库: public delegate void MyDelegate(int requestId, double price, int amount); public event MyDelegate MyEvent; 我该如何正确观察这个? 我尝试使用Observable.FromEvent()但由于事件的参数不是EventArgs类型,我没有看到FromEvent()或FromEventPattern()是如何工作的。 我目前的解决方法是将自定义委托附加到事件然后调用Subject.OnNext()但我猜这不是我应该怎么做。 这是我当前解决方法的一个示例: MyEvent += new MyDelegate((int requestId, double price, int amount) => { Task.Run(() => { var args = new MyArgs() { requestId = requestId, price = price, amount = amount, }; this.mySubject.OnNext(args); }); });

强制刷新到Observable.Buffer c#

有没有办法强制Observable.Buffer在缓冲时间结束前刷新? 在示例中: mSubscription = mFluxObservable.Buffer(new TimeSpan(0, 0, 1, 30)).Subscribe(o => saver(o, iSessionId)); 我希望在1:30时段结束前刷新数据!

Reactive Extensions中的递归/扇出

我正在尝试拼凑一个如下工作的Rx管道: 我编写了一个函数,它接受一个I​​Observable,为我提供包含公司信息的配置文件 我查询各种数据源以查找可能相关的公司简档,所有这些都是并行的。 我将它合并到一个公司简介的IObservable中。 当我找回这些可能相关的配置文件时,我将它们与我已经观察过的配置文件进行比较,如果它们的相关性> 80%并且与我已经观察过的任何配置文件不同,我认为它们是匹配的。 我想将匹配公司反馈到第1步,以便我可以搜索这些新匹配配置文件的相关数据。 我使用一些已知的良好配置文件来引导该过程。 最终,没有更多匹配的配置文件尚未被看到,因此流程结束。 我编程时遇到了麻烦。 如果我使用Subject来允许管道的尾端将其配置文件发送到工作流的开头,那么没有人会调用OnCompleted并且我从未发现该过程已经结束。 如果我用递归来开发它,我似乎总是最终得到堆栈溢出,因为我试图用自己的返回值调用一个函数。 任何人都可以帮助我完成这项任务,我可以确定这个过程已经结束了吗?

在dispose之后的Reactive Rx 2.0 EventLoopScheduler ObjectDisposedException

我正在使用Rx 2.0的EventLoopScheduler来排队/序列化工作。 当我需要处理调度程序时,如果还有剩余的工作,我将收到一个未处理的ObjectDisposedException。 这是预期的行为吗? 人为/示例代码: EventLoopScheduler scheduler = new EventLoopScheduler(); List handles = new List(); for (int i = 0; i < 100; ++i) { var handle = Observable.Interval(TimeSpan.FromMilliseconds(1)) .ObserveOn(scheduler) .Subscribe(Observer.Create((x) => Thread.Sleep(1000))); handles.Add(handle); } Thread.Sleep(TimeSpan.FromSeconds(1)); foreach (var handle in handles) handle.Dispose(); scheduler.Dispose(); Console.ReadLine(); 堆栈跟踪: System.ObjectDisposedException at System.Reactive.Concurrency.EventLoopScheduler.Schedule[TState](TState state, TimeSpan dueTime, Func`3 action) at System.Reactive.Concurrency.LocalScheduler.Schedule[TState](TState […]

react native管道 ​​- 如何控制并行性?

我正在构建一个简单的处理管道,其中一个项目被作为输入提取,它由多个处理器以顺序方式操作,最后输出。 下图描述了整体架构: 它当前的工作方式 :Pipeline尽可能快地从提供者那里获取项目。 一旦获取了一个项目,它就会被传递给处理器。 处理完项目后,将通知输出。 虽然以顺序方式处理单个项目,但是可以并行处理多个项目(取决于从提供者获取它们的速度)。 从管道创建并返回的IObservable如下所示: return Observable.Create(async observer => { while (_provider.HasNext) { T item = await _provider.GetNextAsync(); observer.OnNext(item); } }).SelectMany(item => Observable.FromAsync(() => _processors.Aggregate( seed: Task.FromResult(item), func: (current, processor) => current.ContinueWith( // Append continuations. previous => processor.ProcessAsync(previous.Result)) .Unwrap()))); // We need to unwrap Task{T} from Task{Task{T}}; 缺少的部分 :我需要一个控制机制来控制在任何给定时间管道中可以有多少项(最大) 。 例如,如果max并行处理为3 ,那么将导致以下工作流程: […]

在不重新评估序列的情况下获取IObservable中的上一个元素

在IObservable序列中(在.NET的Reactive Extensions中),我想获取previous和current元素的值,以便我可以比较它们。 我在网上发现了一个类似于下面的例子来完成任务: sequence.Zip(sequence.Skip(1), (prev, cur) => new { Previous = prev, Current = cur }) 它工作正常,除了它评估序列两次,我想避免。 您可以看到使用此代码对其进行了两次评估: var debugSequence = sequence.Do(item => Debug.WriteLine(“Retrieved an element from sequence”)); debugSequence.Zip(debugSequence.Skip(1), (prev, cur) => new { Previous = prev, Current = cur }).Subscribe(); 输出显示的调试行数是序列中元素的两倍。 我理解为什么会发生这种情况,但到目前为止,我还没有找到一种不会对序列进行两次评估的替代方法。 如何只将一个序列评估与前一个和当前一起组合?

订阅后添加可观察序列

我们使用Rx来监控Silverlight应用程序中的活动,以便我们可以在一段时间不活动后向用户显示消息。 我们将事件(鼠标移动等)转换为可观察对象,然后将可观察对象合并在一起以创建单个(allActivity)可观察对象。 然后,我们使用timepan限制allActivity observable,并且当系统处于非活动状态一段时间时,会预订某些通知。 如何在订阅之后为此添加一个新的observable / sequence(以便订阅选择它而不取消订阅和重新订阅)。 例如,将几个序列合并在一起,节流,订阅。 现在为已订阅的observable添加一个额外的序列。 示例代码: private IObservable allActivity; public void CreateActivityObservables(UIElement uiElement) { // Create IObservables of event types we are interested in and project them as DateTimes // These are our observables sequences that can push data to subscribers/ observers // NB: These are like IQueryables in the sense […]

如何在不使用Rx Framework的情况下限制事件的速度

我想限制一个事件的速度,如何在不使用Microsoft Rx框架的情况下实现这一目标。 我在Rx的帮助下做到了这一点。 但我正在尝试的是,我需要根据时间段限制Map的View更改事件。 是否可以在不使用Rx的情况下实现相同的function。 我不允许使用Rx,我必须保持二进制大小尽可能小。