Tag: system.reactive

RX:序列的有状态变换,例如指数移动平均

你怎么能在RX中做一个简单的,有状态的序列变换? 假设我们想要对IObservable noisySequence进行指数移动平均变换。 每当noisySequence滴答时,emaSequence应该勾选并返回值(previousEmaSequenceValue *(1-lambda)+ latestNoisySequenceValue * lambda) 我想我们使用的是Subjects,但究竟是怎么回事? public static void Main() { var rand = new Random(); IObservable sequence = Observable .Interval(TimeSpan.FromMilliseconds(1000)) .Select(value => value + rand.NextDouble()); Func addNoise = x => x + 10*(rand.NextDouble() – 0.5); IObservable noisySequence = sequence.Select(addNoise); Subject exponentialMovingAverage = new Subject(); // ??? sequence.Subscribe(value => Console.WriteLine(“original sequence “+value)); […]

在WPF中使用Unity解析时,SynchronizationContext.Current为null

我有一个看起来像这样的WPF代码。 public class AlphaProductesVM : BaseModel { private ObservableCollection _NwCustomers; private int i = 0; public AlphaProductesVM () { _NwCustomers = new ObservableCollection(); var repository = new NorthwindRepository(); repository .GetAllProducts() .ObserveOn(SynchronizationContext.Current) .Subscribe(AddElement); } public void AddElements(IEnumerable elements) { foreach (var alphabeticalListOfProduct in elements) { AddElement(alphabeticalListOfProduct); } } public ObservableCollection NwCustomers { get { return _NwCustomers; […]

如何在一段时间内对一个流进行分区(GroupBy)并监视Rx中元素的缺失?

前几天我一直在尝试编写一个Rx查询来处理来自源的事件流,并检查是否缺少某些ID。 定义缺席使得存在一系列时间窗口(例如,在从9:00到17:00的所有日期),在该时间窗口期间应该存在最多20分钟而没有在流中出现ID。 更复杂的是,应根据身份certificate确定缺勤时间。 例如,假设出现在事件的组合流(A,A,B,C,A,C,B等)中的三种事件A,B和C,可以定义为 活动将在每天的9:00至10:00进行监控,最多不超过10分钟。 B事件每天从9:00到11:00进行监控,最多没有事件为5分钟。 C事件的监控时间为每天的12:00至15:00,最多没有事件为30分钟。 我想我需要首先对流进行分区以通过GroupBy分离事件,然后使用缺少规则处理生成的单独流。 我已经在Microsoft Rx论坛上对此进行了一些考虑(非常感谢Dave),我有一些工作代码可以生成规则并进行缺失检查,但是我很挣扎,例如,如何将其与分组相结合。 所以,没有进一步的演讲,迄今为止被黑客攻击的代码: //Some sample data bits representing the events. public class FakeData { public int Id { get; set; } public string SomeData { get; set; } } //Note the Now part in DateTime to zero the clock time and have only the date. The purpose […]

抛出EventLoopScheduler后RX2.0:ObjectDisposedException

我们最近将系统从RX 1.11111移植到RX 2.0并发现了这个问题。 我们对ObserveOn使用EventLoopScheduler,如下所示: IDisposable subscription = someSubject .ObserveOn(m_eventLoopScheduler) .SomeMoreRXFunctions() .Subscribe((something)=>something) 调度程序m_eventLoopScheduler.Dispose在应用程序出口( m_eventLoopScheduler.Dispose )上。 在此之前,我们处理observable( subscription.Dispose )的所有subscription.Dispose 。 尽管如此,我们在EventLoopScheduler.Schedule中得到了一个ObjectDisposedException 。 捕获该exception是不可能的,因为它起源于RX线程。 这几乎就像Dispose没有摆脱某些队列中的所有项目。 我们试图删除对EventLoopScheduler.Dispose的调用,exception消失了。 但是, SomeMoreRXFunctions()的代码执行了大约10次,尽管所有订阅都被处理掉了。 还有其他方法可以正确关闭EventLoopScheduler吗?

使用TcpClient和Reactive Extensions从Stream读取连续字节流

请考虑以下代码: internal class Program { private static void Main(string[] args) { var client = new TcpClient(); client.ConnectAsync(“localhost”, 7105).Wait(); var stream = client.GetStream(); var observable = stream.ReadDataObservable().Repeat(); var s = from d in observable.Buffer(4) let headerLength = IPAddress.NetworkToHostOrder(BitConverter.ToInt16(d.ToArray(), 2)) let b = observable.Take(headerLength) select b.ToEnumerable().ToArray(); s.Subscribe(a => Console.WriteLine(“{0}”, a)); Console.ReadLine(); } } public static class Extensions […]

为什么每个观察委托都在一个新线程上运行

在Rx中使用Scheduler.NewThread进行ObserveOn方法时,当Rx已经保证OnNexts永远不会重叠时,让每个Observation委托(OnNext)在新线程上运行的优点是什么。 如果要逐个调用每个OnNext,为什么需要为每个OnNext创建新线程。 我理解为什么一个人想在一个不同于订阅和应用程序线程的线程上运行Observation委托,但是当它们永远不会并行运行时,在新线程上运行每个观察委托?….对我来说没有意义我在这里遗漏了什么? 例如 using System; using System.Linq; using System.Reactive.Concurrency; using System.Reactive.Linq; using System.Threading; namespace RxTesting { class Program { static void Main(string[] args) { Console.WriteLine(“Application Thread : {0}”, Thread.CurrentThread.ManagedThreadId); var numbers = from number in Enumerable.Range(1,10) select Process(number); var observableNumbers = numbers.ToObservable() .ObserveOn(Scheduler.NewThread) .SubscribeOn(Scheduler.NewThread); observableNumbers.Subscribe( n => Console.WriteLine(“Consuming : {0} \t on Thread : […]

Observable.ObserveOn()似乎没有效果

我试图使用Rx并行处理项目。 似乎我不能告诉Rx并行运行我的观察者的OnNext()。 这是测试代码来演示 [Test] public void ObservableObserveOnNewThreadRunsInParallel() { Console.WriteLine(“Starting thread: {0}”, Thread.CurrentThread.ManagedThreadId); // store items as they are output var list = new List<Tuple>(); // used to wait until a sequences are complete var ev = new AutoResetEvent(false); // try these schedulers var schedulers = new[] { Tuple.Create(“ThreadPoolScheduler.Instance”, (IScheduler)ThreadPoolScheduler.Instance), Tuple.Create(“NewThreadScheduler.Default”, (IScheduler)NewThreadScheduler.Default), Tuple.Create(“TaskPoolScheduler.Default”, (IScheduler)TaskPoolScheduler.Default), Tuple.Create(“Scheduler.Default”, (IScheduler)Scheduler.Default), […]

如何连接多个IObservable序列?

var a = Observable.Range(0, 10); var b = Observable.Range(5, 10); var zip = a.Zip(b, (x, y) => x + “-” + y); zip.Subscribe(Console.WriteLine); 打印 0 – 5 1 – 6 2 – 7 … 相反,我想加入相同的价值观 5 – 5 6 – 6 7 – 7 8 – 8 … 这是合并100个有序异步序列的问题的简化示例。 加入两个IEnumerable非常容易,但我找不到在Rx中做这样的事情的方法。 有任何想法吗? 更多关于投入和我想要实现的目标。 基本上,整个系统是一个实时管道,具有通过fork-join模式连接的多个状态机(聚合器,缓冲区,平滑filter等)。 RX是否适合实现这些东西? 每个输入都可以表示为 […]

Rx扩展:Parallel.ForEach在哪里?

我有一段使用Parallel.ForEach的代码,可能基于旧版本的Rx扩展或任务并行库。 我安装了当前版本的Rx扩展,但找不到Parallel.ForEach 。 我没有使用该库的任何其他花哨的东西,只是想像这样并行处理一些数据: Parallel.ForEach(records, ProcessRecord); 我发现了这个问题 ,但我不想依赖旧版本的Rx。 但是我无法为Rx找到类似的东西,那么使用当前Rx版本的当前和最直接的方法是什么? 该项目使用的是.NET 3.5。

Rx后退并重试

这基于此SO中提供的代码: 编写Rx“RetryAfter”扩展方法 我正在使用Markus Olsson的代码(目前仅进行评估),之前有人问我试图在Github上抓住Markus,但是我在工作的地方被封锁了,所以我觉得我唯一能做的就是问在这里。 很抱歉,如果这与任何人都很糟糕。 所以我在一个小的演示中使用以下代码: class Attempt1 { private static bool shouldThrow = true; static void Main(string[] args) { Generate().RetryWithBackoffStrategy(3, MyRxExtensions.ExponentialBackoff, ex => { return ex is NullReferenceException; }, Scheduler.TaskPool) .Subscribe( OnNext, OnError ); Console.ReadLine(); } private static void OnNext(int val) { Console.WriteLine(“subscriber value is {0} which was seen on threadId:{1}”, val, Thread.CurrentThread.ManagedThreadId); } […]