如何关联function输入和输出?

考虑下面的简单程序。 它有一个可观察的整数和一个函数来计算最近发布的整数是偶数还是奇数。 出乎意料的是,该程序在报告数字发生变化之前报告最近的数字是偶数/奇数。

static void Main(string[] args) { int version = 0; var numbers = new Subject(); IObservable isNumberEven = numbers.Select(i => i % 2 == 0); isNumberEven .Select(i => new { IsEven = i, Version = Interlocked.Increment(ref version) }) .Subscribe(i => Console.WriteLine($"Time {i.Version} : {i.IsEven}")); numbers .Select(i => new { Number = i, Version = Interlocked.Increment(ref version) }) .Subscribe(i => Console.WriteLine($"Time {i.Version} : {i.Number}")); numbers.OnNext(1); numbers.OnNext(2); numbers.OnNext(3); Console.ReadLine(); } 

输出是:

 Time 1 : False Time 2 : 1 Time 3 : True Time 4 : 2 Time 5 : False Time 6 : 3 

我认为改变这个数字会引发一系列下游效应,这些将在他们发生的命令中报告。 交换订阅订单将交换报告结果的方式。 我知道rx是异步的,事情可能以非确定性的顺序发生。 如果我在我的函数中使用.Delay()或Web调用,我无法确定何时会报告结果。 但在这种情况下,我很惊讶。

为什么这是一个大问题? 我认为这意味着如果我想尝试关联函数输入和输出(比如打印数字,因为它们是偶数还是奇数),我必须在输出结果中包含输入参数,如下所示:

 var isNumberEven = numbers.Select(i => new { Number = i, IsEven = i % 2 == 0 }); 

我想我可以构建一堆小的简单函数,然后使用rx运算符组合它们来完成复杂的计算。 但也许我不能使用rx运算符来组合/连接/关联结果。 当我定义每个函数时,我必须自己关联输入和输出。

在某些情况下,我可以使用rx运算符来关联结果。 如果每个输入产生一个输出,我可以压缩两个。 但是只要你做了类似Throttle的输入它就不再起作用了。

这个版本的程序似乎确实以合理的方式报告数字是偶数还是奇数。

 static void Main(string[] args) { var numbers = new Subject(); var isNumberEven = numbers.Select(i => i % 2 == 0); var publishedNumbers = numbers.Publish().RefCount(); var report = publishedNumbers .GroupJoin( isNumberEven, (_) => publishedNumbers, (_) => Observable.Empty(), (n, e) => new { Number = n, IsEven = e }) .SelectMany(i => i.IsEven.Select(j => new { Number = i.Number, IsEven = j })); report.Subscribe(i => Console.WriteLine($"{i.Number} {(i.IsEven ? "even" : "odd")}")); numbers.OnNext(1); numbers.OnNext(2); numbers.OnNext(3); Console.ReadLine(); } 

输出如下:

 1 odd 2 even 3 odd 

但我不知道这是否是一个幸运的巧合,或者我是否可以依赖它。 Rx中的哪些操作以确定性顺序发生? 哪些是不可预测的? 我应该定义所有函数以在结果中包含输入参数吗?

你的第一个程序的行为完全符合我的预期,并且确定性如此。

我知道rx是异步的,事情可能以非确定性的顺序发生。

如果引入非确定性行为(如并发/调度),则事件仅以非确定性顺序发生,否则Rx是确定性的。

这里有几个问题/误解。 1)可变外部状态 – version 2)使用主题(但在此示例中根本不是问题)3)对如何发出回调的误解。

让我们只关注3) 。 如果我们为您提供代码并将其打包到其基本的callite,您可能会看到Rx有多么简单。

numbers.OnNext(1); 主题将按照订阅的顺序查找它们的订阅和OnNext

 IObservable isNumberEven = numbers.Select(i => i % 2 == 0); isNumberEven .Select(i => new { IsEven = i, Version = Interlocked.Increment(ref version) }) .Subscribe(i => Console.WriteLine($"Time {i.Version} : {i.IsEven}")); 

也可以减少到

 numbers.Select(i => i % 2 == 0) .Select(i => new { IsEven = i, Version = Interlocked.Increment(ref version) }) .Subscribe(i => Console.WriteLine($"Time {i.Version} : {i.IsEven}")); 

有人可能会争辩说,因为isNumberEven从未在其他任何地方使用过,所以你应该把它减少到这个。

所以我们可以看到我们有第一个订户。 而且它运行的代码实际上是这样的

 private void HandleOnNext(int i) { var isEven = i % 2 == 0 var temp = new { IsEven = isEven , Version = Interlocked.Increment(ref version) }; Console.WriteLine($"Time {temp .Version} : {temp .IsEven}"); } 

我们的第二个订阅者(因为.Subscribe(方法是在偶数订阅后调用的)是numbers订阅者。他的代码可以有效地归结为

 private void HandleOnNext(int i) { var temp = new { Number = i, Version = Interlocked.Increment(ref version) }; Console.WriteLine($"Time {temp.Version} : {temp.Number}"); } 

所以一旦你完全解构了代码,你最终会得到这个

 void Main() { int version = 0; //numbers.OnNext(1); ProcessEven(1, ref version); ProcessNumber(1, ref version); //numbers.OnNext(2); ProcessEven(2, ref version); ProcessNumber(2, ref version); //numbers.OnNext(3); ProcessEven(3, ref version); ProcessNumber(3, ref version); } // Define other methods and classes here private void ProcessEven(int i, ref int version) { var isEven = i % 2 == 0; var temp = new { IsEven = isEven, Version = Interlocked.Increment(ref version) }; Console.WriteLine($"Time {temp.Version} : {temp.IsEven}"); } private void ProcessNumber(int i, ref int version) { var temp = new { Number = i, Version = Interlocked.Increment(ref version) }; Console.WriteLine($"Time {temp.Version} : {temp.Number}"); } 

一旦所有回调和订阅都被确定,那么你可以看到这不是神奇的事情,一切都是确定性的。

我应该定义所有函数以在结果中包含输入参数吗?

为了回答你的问题(考虑到你对Rx的误解,我犹豫不决),你只需要在结果序列的顺序是非确定性的时候这样做。 例如,如果您一次发出多个Web请求,则可能是这样。 您无法确定他们是否会按照您发送的顺序回复。 但是,您可以强制这些场景与Concat等运算符的使用一致