如何使用Rx“重建从SerialPort读取的数据行”

我正在学习Rx,并试图使用SerialPort从GPS设备实现“NMEA句子阅读器”。 事实上它的GPS数据对问题的重要性不大,所以让我们澄清NMEA格式由线组成,’$’符号代表新条目的开头,所以你得到的“句子”看起来类似于:

$[data for first line goes here] $[data for second line goes here] ... 

连接SerialPort的DataReceived事件非常简单:

  var port = new SerialPort("COM3", 4800); var serialPortSource = Observable.FromEventPattern ( handler => port.DataReceived += handler, handler => port.DataReceived -= handler ).Select(e => port.ReadExisting()); 

这给出了一个IObservable ,但显然这里返回的字符串不一定是NMEA句子。 例如,对于上面的示例数据,可以得到如下序列:

 $[data for first line g oes here]\r\n$[data for second line goes here] 

如何将其正确地转换为一系列实际句子? 在IEnumerable世界中,我可能会从一系列char开始,并编写一个与此类似的扩展方法:

 public static IEnumerable ToNmeaSentence( this IEnumerable characters ) { var sb = new StringBuilder(); foreach (var ch in characters) { if (ch == '$' && sb.Length > 0) { yield return sb.ToString(); sb.Clear(); } sb.Append(ch); } } 

现在我想知道在Rx中这种操作是否有惯用的方法?

它与Enumerables的代码完全相同。 您使用Subscribe而不是快速枚举,并使用observer.OnNext而不是yield return 。 哦,你必须使用Observable.Create ,因为C#没有Observers的语言支持,就像它对Enumerables一样(但是。这不是Rx的失败)。

Enumerables和Observables完全相同。 一推,另一推。 创建它们的语法略有不同。 就这样。

 public static IObservable ToNmeaSentence( this IObservable characters ) { return Observable.Create(observer => { var sb = new StringBuilder(); return characters.Subscribe(ch => { if (ch == '$' && sb.Length > 0) { observer.OnNext(sb.ToString()); sb.Clear(); } sb.Append(ch); }); }); } 

我通常不会在这个低级别上进行编程,但Observables并不比Enumerables更复杂 。 当人们第一次学习Enumerables时,很难理解。 当人们第一次学习Observables时,很难理解。 他们两个做同样的事情,但一个推,一个拉。 除了那个区别之外,两者之间存在1-1的关系。

你认为Rx比Enumerables和LINQ to Objects更复杂是错误的。 当你还在学习时,它就会出现。

这是一个常见问题。 我目前的想法是你需要一个运算符,它接受一个源序列并根据谓词将其分成(连续)窗口。

这个运算符可能很有用。

 public static IObservable> WindowByExclusive(this IObservable input, Func isWindowBoundary) { return Observable.Create>(o=> { var source = input.Publish().RefCount(); var left = source.Where(isWindowBoundary).Select(_=>Unit.Default).StartWith(Unit.Default); return left.GroupJoin( source.Where(c=>!isWindowBoundary(c)), x=>source.Where(isWindowBoundary), x=>Observable.Empty(), (_,window)=>window) .Subscribe(o); }); } 

它基于谓词有效地将源序列分成片。

你可以通过首先将你的字符串列表变成一系列char来使用它。 然后,您可以将WindowBy运算符应用于c=='$'的谓词。 这将为您提供两个包含所需数据的窗口。 现在您将数据分开,然后可以将每个窗口中的字符连接到string.Join的字符串中。

完整的LinqPad样本

 void Main() { var data = new List(){@"$[data for first line g", "oes here]\r\n$[data for ", "second line goes here]"}; data.ToObservable() .SelectMany(s=>s) .WindowByExclusive(c => c=='$') .SelectMany(window=>window.ToList().Select(l=>string.Join(string.Empty, l))) .Where(s=>!string.IsNullOrEmpty(s)) .Dump("WindowByExclusive"); } // Define other methods and classes here public static class ObEx { public static IObservable> WindowByExclusive(this IObservable input, Func isWindowBoundary) { return Observable.Create>(o=> { var source = input.Publish().RefCount(); var left = source.Where(isWindowBoundary).Select(_=>Unit.Default).StartWith(Unit.Default); return left.GroupJoin( source.Where(c=>!isWindowBoundary(c)), x=>source.Where(isWindowBoundary), x=>Observable.Empty(), (_,window)=>window) .Subscribe(o); }); } } 

输出继电器:

WindowByExclusive

 [data for first line goes here] [data for second line goes here]