如何使用Rx“重建从SerialPort读取的数据行”
我正在学习Rx,并试图使用SerialPort从GPS设备实现“NMEA句子阅读器”。 事实上它的GPS数据对问题的重要性不大,所以让我们澄清NMEA格式由线组成,’$’符号代表新条目的开头,所以你得到的“句子”看起来类似于:
$[data for first line goes here] $[data for second line goes here] ...
连接SerialPort的DataReceived事件非常简单:
var port = new SerialPort("COM3", 4800); var serialPortSource = Observable.FromEventPattern ( handler => port.DataReceived += handler, handler => port.DataReceived -= handler ).Select(e => port.ReadExisting());
这给出了一个IObservable
,但显然这里返回的字符串不一定是NMEA句子。 例如,对于上面的示例数据,可以得到如下序列:
$[data for first line g oes here]\r\n$[data for second line goes here]
如何将其正确地转换为一系列实际句子? 在IEnumerable
世界中,我可能会从一系列char
开始,并编写一个与此类似的扩展方法:
public static IEnumerable ToNmeaSentence( this IEnumerable characters ) { var sb = new StringBuilder(); foreach (var ch in characters) { if (ch == '$' && sb.Length > 0) { yield return sb.ToString(); sb.Clear(); } sb.Append(ch); } }
现在我想知道在Rx中这种操作是否有惯用的方法?
它与Enumerables的代码完全相同。 您使用Subscribe
而不是快速枚举,并使用observer.OnNext
而不是yield return
。 哦,你必须使用Observable.Create
,因为C#没有Observers的语言支持,就像它对Enumerables一样(但是。这不是Rx的失败)。
Enumerables和Observables完全相同。 一推,另一推。 创建它们的语法略有不同。 就这样。
public static IObservable ToNmeaSentence( this IObservable characters ) { return Observable.Create(observer => { var sb = new StringBuilder(); return characters.Subscribe(ch => { if (ch == '$' && sb.Length > 0) { observer.OnNext(sb.ToString()); sb.Clear(); } sb.Append(ch); }); }); }
我通常不会在这个低级别上进行编程,但Observables并不比Enumerables更复杂 。 当人们第一次学习Enumerables时,很难理解。 当人们第一次学习Observables时,很难理解。 他们两个做同样的事情,但一个推,一个拉。 除了那个区别之外,两者之间存在1-1的关系。
你认为Rx比Enumerables和LINQ to Objects更复杂是错误的。 当你还在学习时,它就会出现。
这是一个常见问题。 我目前的想法是你需要一个运算符,它接受一个源序列并根据谓词将其分成(连续)窗口。
这个运算符可能很有用。
public static IObservable> WindowByExclusive(this IObservable input, Func isWindowBoundary) { return Observable.Create>(o=> { var source = input.Publish().RefCount(); var left = source.Where(isWindowBoundary).Select(_=>Unit.Default).StartWith(Unit.Default); return left.GroupJoin( source.Where(c=>!isWindowBoundary(c)), x=>source.Where(isWindowBoundary), x=>Observable.Empty(), (_,window)=>window) .Subscribe(o); }); }
它基于谓词有效地将源序列分成片。
你可以通过首先将你的字符串列表变成一系列char
来使用它。 然后,您可以将WindowBy
运算符应用于c=='$'
的谓词。 这将为您提供两个包含所需数据的窗口。 现在您将数据分开,然后可以将每个窗口中的字符连接到string.Join
的字符串中。
完整的LinqPad样本
void Main() { var data = new List(){@"$[data for first line g", "oes here]\r\n$[data for ", "second line goes here]"}; data.ToObservable() .SelectMany(s=>s) .WindowByExclusive(c => c=='$') .SelectMany(window=>window.ToList().Select(l=>string.Join(string.Empty, l))) .Where(s=>!string.IsNullOrEmpty(s)) .Dump("WindowByExclusive"); } // Define other methods and classes here public static class ObEx { public static IObservable> WindowByExclusive(this IObservable input, Func isWindowBoundary) { return Observable.Create>(o=> { var source = input.Publish().RefCount(); var left = source.Where(isWindowBoundary).Select(_=>Unit.Default).StartWith(Unit.Default); return left.GroupJoin( source.Where(c=>!isWindowBoundary(c)), x=>source.Where(isWindowBoundary), x=>Observable.Empty(), (_,window)=>window) .Subscribe(o); }); } }
输出继电器:
WindowByExclusive
[data for first line goes here] [data for second line goes here]