如何获得N hot Observable 实例的“最后”项的总和?

编辑:2013年9月15日 – 我正在描述我的情景进一步分解为帮助每个人更好地了解我的情况的步骤。 添加了整个应用程序的源代码以供下载。 如果要跳转到原始问题,请向下滚动到最后一个标题。 请让我知道这些问题。 谢谢

摘要

阿拉斯加州首府朱诺有一个AST(阿拉斯加州警官)总部大楼,他们希望在这个大屏幕上显示一个自动显示和更新的单个号码。 该号码称为(犯罪商数指数)或CQI

CQI基本上是一个计算出的数字,用于显示当前的国家犯罪情况……

如何计算?

运行屏幕的程序是一个.NET WPF应用程序,它通过Hot IObservable流不断接收CrimeReport对象。

每个城市计算CQI,然后采用所有城市的Sum(),称为国家CQI以下是计算国家CQI的步骤

第1步 – 接收犯罪数据

每次报告犯罪时,CrimeReport都会发送到.NET应用程序。 它具有以下组件

犯罪日期时间

– 市/县管辖

SeverityLevel – 严肃/非严重

EstimatedSolveTime – AST确定解决犯罪所需的估计天数。

所以在这一步中,我们订阅了IObservable并创建了MainViewModel的实例

IObservable reportSource = mainSource.Publish(); MainVM = new MainViewModel(reportSource); reportSource.Connect(); 

第2步 – 逐个城市,每个城市做数学

当您收到报告时,请按城市分组

 var cities = reportSource.GroupBy(k => k.City) .Select(g => new CityDto(g.Key, g); 

CityDto是一个DTO课程,它接收当前城市的所有报告并计算城市的CQI。

城市CQI的计算通过以下公式完成

如果严重犯罪总数与非严重犯罪总数之比小于1

然后

城市的CQI =比率x估计求解时间的最小值

其他

城市的CQI =比率x最大估计求解时间

这是CityDto的类定义

 internal class CityDto { public string CityName { get; set; } public IObservable CityCqi {get; set;} public CityDto(string cityName, IObservable cityReports) { CityName = cityName; // Get all serious and non serious crimes // var totalSeriousCrimes = cityReports.Where(c => c.Severity == CrimeSeverity.Serious) .Scan(0, (p, _) => p++); var totalnonSeriousCrimes = cityReports.Where(c => c.Severity == CrimeSeverity.NonSerious) .Scan(0, (p, _) => p++); // Get the ratio // var ratio = Observable.CombineLatest(totalSeriousCrimes, totalnonSeriousCrimes, (s, n) => n == 0? s : s/n); // Avoding DivideByZero here // Get the minimum and maximum estimated solve time // var minEstimatedSolveTime = cityReports.Select(c => c.EstimatedSolveTime) .Scan(5000, (p, n) => n 

c.EstimatedSolveTime) .Scan(0, (p, n) => n > p? n : p); //Time for the City's CQI // CityCqi = Observable.CombineLatest(ratio, minEstimatedSolveTime, maxEstimatedSolveTime, (r, n, x) => r < 1.0? r * n : r * m); } }

现在我们有城市DTO对象维护城市的CQI值并通过IObservable暴露实时CQI,阿拉斯加州的Capital希望Sum()将所有城市的CQI显示为阿拉斯加的CQI并将其显示在屏幕上并且每个在参与CQI计划的城市/县的任何地方报告的犯罪应立即对国家的CQI产生影响

第3步 – 为州填写城市数据

现在我们必须计算整个州的CQI,它是大屏幕上的实时更新,我们有State的视图模型,名为MainViewModel

 internal class MainViewModel { public MainViewModel(IObservable mainReport) { /// Here is the snippet also mentioned in Step 2 // var cities = mainReport.GroupBy(k => k.City) .Select(g => new CityDto(g.Key, g)); ///// T his ///// Is //// Where //// I /// am /// Stuck // var allCqis = cities.Select(c => c.CityCqi); // gives you IObservable<IObservable> , /// Need to use latest of each observable in allCqi and sum them up //// How do I do it ? } } 

约束

  • 并非所有阿拉斯加城市都参与州的CQI计划,但城市正在日复一日地注册,所以我不能拥有List并且无论入学情况如何都无法添加所有城市。 因此,IObservable只维护那些不仅参与的城市,而且还发送了至少一个CrimeReport对象。

完整源代码

点击此处即可下载来源

原来问的问题

我有一个热的可观察的多个热观察点……

 IObservable<IObservable> 

我想要一个可观察的,当订阅时会让观察者知道里面所有可观察者的所有“最新”十进制数的总和。

我怎样才能做到这一点? 我尝试过CombineLatest(…)但是无法正确使用它。

谢谢

Rxx库有一个CombineLatest()的重载,它接受一个IObservable> 。 如果您使用此过载,则解决方案很简单:

 var runningSum = allCqis .Select(cqi => cqi.StartWith(0)) // start each inner sequence off with 0 .CombineLatest() // produces an IObservable> .Select(cqis => cqis.Sum()); // LINQ operator Sum(IEnumerable) 

查看Rxx.CombineLatest的源代码可能有助于了解问题如何在“引擎盖下”解决

很多问题! 也许是时候了解你的Rx技巧? 几乎所有最近的问题都在我的网站IntroToRx.com中介绍 。 深入了解Rx,可以让您比在论坛上提问更快地回答这些相当简单的问题。 你应该能在不到3天的时间内阅读这本书。

无论如何….

1你想要一笔运行金额还是最后一笔金额?

2那么,您是否想要所有流的所有值或每个流的总和的总和?

要获取序列的单个和值,请使用.Sum()运算符。 http://introtorx.com/Content/v1.0.10621.0/07_Aggregation.html#MaxAndMin

要获得运行总计,请使用“ Scan运算符。 http://introtorx.com/Content/v1.0.10621.0/07_Aggregation.html#Scan

所以答案可能是这样的(未经测试):

 sources.Select(source=>source.Scan(0m, (acc, value)=>acc+=value)).Merge();