RX:序列的有状态变换,例如指数移动平均

你怎么能在RX中做一个简单的,有状态的序列变换?

假设我们想要对IObservable noisySequence进行指数移动平均变换。

每当noisySequence滴答时,emaSequence应该勾选并返回值(previousEmaSequenceValue *(1-lambda)+ latestNoisySequenceValue * lambda)

我想我们使用的是Subjects,但究竟是怎么回事?

public static void Main() { var rand = new Random(); IObservable sequence = Observable .Interval(TimeSpan.FromMilliseconds(1000)) .Select(value => value + rand.NextDouble()); Func addNoise = x => x + 10*(rand.NextDouble() - 0.5); IObservable noisySequence = sequence.Select(addNoise); Subject exponentialMovingAverage = new Subject(); // ??? sequence.Subscribe(value => Console.WriteLine("original sequence "+value)); noisySequence.Subscribe(value => Console.WriteLine("noisy sequence " + value)); exponentialMovingAverage.Subscribe(value => Console.WriteLine("ema sequence " + value)); Console.ReadLine(); } 

这是您可以将状态附加到序列的方法。 在这种情况下,它计算最后10个值的平均值。

 var movingAvg = noisySequence.Scan(new List(), (buffer, value)=> { buffer.Add(value); if(buffer.Count>MaxSize) { buffer.RemoveAt(0); } return buffer; }).Select(buffer=>buffer.Average()); 

但你可以使用Window(这是一种泛化)来获得你的平均值。

 noisySequence.Window(10) .Select(window=>window.Average()) .SelectMany(averageSequence=>averageSequence); 

对于许多这类计算, Buffer是最简单的方法

 var movingAverage = noisySequence.Buffer(/*last*/ 3, /*move forward/* 1 /*at a time*/) .Select(x => (x[0] + x[1] + x[2]) / 3.0); 

如果需要携带状态,请使用Scan运算符,它与Aggregate类似,不同之处在于它每次迭代都会产生值。

谢谢! 这是使用Scan的解决方案

  const double lambda = 0.99; IObservable emaSequence = noisySequence.Scan(Double.NaN, (emaValue, value) => { if (Double.IsNaN(emaValue)) { emaValue = value; } else { emaValue = emaValue*lambda + value*(1-lambda); } return emaValue; }).Select(emaValue => emaValue);