使用C#反序列化Avro文件
我找不到用C#反序列化Apache Avro文件的方法。 Avro文件是Microsoft Azure Event Hub中的存档function生成的文件。
使用Java,我可以使用Apache的Avro Tools将文件转换为JSON:
java -jar avro-tools-1.8.1.jar tojson --pretty inputfile > output.json
使用NuGet包Microsoft.Hadoop.Avro我能够提取SequenceNumber
, Offset
和EnqueuedTimeUtc
,但由于我不知道Body
使用什么类型,因此抛出exception。 我尝试过使用Dictionary
和其他类型。
static void Main(string[] args) { var fileName = "..."; using (Stream stream = new FileStream(fileName, FileMode.Open, FileAccess.Read, FileShare.Read)) { using (var reader = AvroContainer.CreateReader(stream)) { using (var streamReader = new SequentialReader(reader)) { var record = streamReader.Objects.FirstOrDefault(); } } } } [DataContract(Namespace = "Microsoft.ServiceBus.Messaging")] public class EventData { [DataMember(Name = "SequenceNumber")] public long SequenceNumber { get; set; } [DataMember(Name = "Offset")] public string Offset { get; set; } [DataMember(Name = "EnqueuedTimeUtc")] public string EnqueuedTimeUtc { get; set; } [DataMember(Name = "Body")] public foo Body { get; set; } // More properties... }
架构如下所示:
{ "type": "record", "name": "EventData", "namespace": "Microsoft.ServiceBus.Messaging", "fields": [ { "name": "SequenceNumber", "type": "long" }, { "name": "Offset", "type": "string" }, { "name": "EnqueuedTimeUtc", "type": "string" }, { "name": "SystemProperties", "type": { "type": "map", "values": [ "long", "double", "string", "bytes" ] } }, { "name": "Properties", "type": { "type": "map", "values": [ "long", "double", "string", "bytes" ] } }, { "name": "Body", "type": [ "null", "bytes" ] } ] }
我能够使用dynamic
获得完整的数据访问。 这是访问原始body
数据的代码,它存储为一个字节数组。 在我的例子中,这些字节包含UTF8编码的JSON,但当然这取决于您最初创建发布到事件中心的EventData
实例的方式:
using (var reader = AvroContainer.CreateGenericReader(stream)) { while (reader.MoveNext()) { foreach (dynamic record in reader.Current.Objects) { var sequenceNumber = record.SequenceNumber; var bodyText = Encoding.UTF8.GetString(record.Body); Console.WriteLine($"{sequenceNumber}: {bodyText}"); } } }
如果有人可以发布一个静态类型的解决方案,我会投票给它,但鉴于任何系统中较大的延迟几乎肯定会与事件中心存档blob的连接,我不担心解析性能。 🙂
本要点介绍了如何使用Microsoft.Hadoop.Avro2使用C#反序列化事件中心捕获,它具有兼容.NET Framework 4.5和.NET Standard 1.6的优点:
var connectionString = ""; var containerName = " "; var blobName = " "; var storageAccount = CloudStorageAccount.Parse(connectionString); var blobClient = storageAccount.CreateCloudBlobClient(); var container = blobClient.GetContainerReference(containerName); var blob = container.GetBlockBlobReference(blobName); using (var stream = blob.OpenRead()) using (var reader = AvroContainer.CreateGenericReader(stream)) while (reader.MoveNext()) foreach (dynamic result in reader.Current.Objects) { var record = new AvroEventData(result); record.Dump(); } public struct AvroEventData { public AvroEventData(dynamic record) { SequenceNumber = (long) record.SequenceNumber; Offset = (string) record.Offset; DateTime.TryParse((string) record.EnqueuedTimeUtc, out var enqueuedTimeUtc); EnqueuedTimeUtc = enqueuedTimeUtc; SystemProperties = (Dictionary) record.SystemProperties; Properties = (Dictionary) record.Properties; Body = (byte[]) record.Body; } public long SequenceNumber { get; set; } public string Offset { get; set; } public DateTime EnqueuedTimeUtc { get; set; } public Dictionary SystemProperties { get; set; } public Dictionary Properties { get; set; } public byte[] Body { get; set; } }
-
NuGet参考:
- Microsoft.Hadoop.Avro2(1.2.1有效)
- WindowsAzure.Storage(8.3.0有效)
-
命名空间:
- Microsoft.Hadoop.Avro.Container
- Microsoft.WindowsAzure.Storage
我终于能够使用Apache C#库/框架了。
我被困了一段时间,因为Azure事件中心的Capturefunction有时会输出一个没有任何消息内容的文件。 我可能还有一个问题,如何将消息最初序列化为EventData对象。
下面的代码用于从捕获blob容器保存到磁盘的文件。
var dataFileReader = DataFileReader.OpenReader(file); foreach (var record in dataFileReader.NextEntries) { // Do work on EventData object }
这也可以使用GenericRecord对象。
var dataFileReader = DataFileReader.OpenReader(file);
这花了一些力气才弄明白。 但是我现在同意这个Azure事件中心捕获function是一个很好的function来备份所有事件。 我仍然觉得他们应该像使用Stream Analytic作业输出一样使格式可选,但也许我会习惯Avro。
我怀疑你剩下的类型应该被定义为:
[DataContract(Namespace = "Microsoft.ServiceBus.Messaging")] [KnownType(typeof(Dictionary))] public class EventData { [DataMember] public IDictionary SystemProperties { get; set; } [DataMember] public IDictionary Properties { get; set; } [DataMember] public byte[] Body { get; set; } }
即使Body
是null
和bytes
的并集,它也会映射到nullable
byte[]
。
在C#中,数组总是引用类型,因此可以为null
并完成契约。