使用C#反序列化Avro文件

我找不到用C#反序列化Apache Avro文件的方法。 Avro文件是Microsoft Azure Event Hub中的存档function生成的文件。

使用Java,我可以使用Apache的Avro Tools将文件转换为JSON:

java -jar avro-tools-1.8.1.jar tojson --pretty inputfile > output.json 

使用NuGet包Microsoft.Hadoop.Avro我能够提取SequenceNumberOffsetEnqueuedTimeUtc ,但由于我不知道Body使用什么类型,因此抛出exception。 我尝试过使用Dictionary和其他类型。

 static void Main(string[] args) { var fileName = "..."; using (Stream stream = new FileStream(fileName, FileMode.Open, FileAccess.Read, FileShare.Read)) { using (var reader = AvroContainer.CreateReader(stream)) { using (var streamReader = new SequentialReader(reader)) { var record = streamReader.Objects.FirstOrDefault(); } } } } [DataContract(Namespace = "Microsoft.ServiceBus.Messaging")] public class EventData { [DataMember(Name = "SequenceNumber")] public long SequenceNumber { get; set; } [DataMember(Name = "Offset")] public string Offset { get; set; } [DataMember(Name = "EnqueuedTimeUtc")] public string EnqueuedTimeUtc { get; set; } [DataMember(Name = "Body")] public foo Body { get; set; } // More properties... } 

架构如下所示:

 { "type": "record", "name": "EventData", "namespace": "Microsoft.ServiceBus.Messaging", "fields": [ { "name": "SequenceNumber", "type": "long" }, { "name": "Offset", "type": "string" }, { "name": "EnqueuedTimeUtc", "type": "string" }, { "name": "SystemProperties", "type": { "type": "map", "values": [ "long", "double", "string", "bytes" ] } }, { "name": "Properties", "type": { "type": "map", "values": [ "long", "double", "string", "bytes" ] } }, { "name": "Body", "type": [ "null", "bytes" ] } ] } 

我能够使用dynamic获得完整的数据访问。 这是访问原始body数据的代码,它存储为一个字节数组。 在我的例子中,这些字节包含UTF8编码的JSON,但当然这取决于您最初创建发布到事件中心的EventData实例的方式:

 using (var reader = AvroContainer.CreateGenericReader(stream)) { while (reader.MoveNext()) { foreach (dynamic record in reader.Current.Objects) { var sequenceNumber = record.SequenceNumber; var bodyText = Encoding.UTF8.GetString(record.Body); Console.WriteLine($"{sequenceNumber}: {bodyText}"); } } } 

如果有人可以发布一个静态类型的解决方案,我会投票给它,但鉴于任何系统中较大的延迟几乎肯定会与事件中心存档blob的连接,我不担心解析性能。 🙂

本要点介绍了如何使用Microsoft.Hadoop.Avro2使用C#反序列化事件中心捕获,它具有兼容.NET Framework 4.5和.NET Standard 1.6的优点:

  var connectionString = ""; var containerName = ""; var blobName = ""; var storageAccount = CloudStorageAccount.Parse(connectionString); var blobClient = storageAccount.CreateCloudBlobClient(); var container = blobClient.GetContainerReference(containerName); var blob = container.GetBlockBlobReference(blobName); using (var stream = blob.OpenRead()) using (var reader = AvroContainer.CreateGenericReader(stream)) while (reader.MoveNext()) foreach (dynamic result in reader.Current.Objects) { var record = new AvroEventData(result); record.Dump(); } public struct AvroEventData { public AvroEventData(dynamic record) { SequenceNumber = (long) record.SequenceNumber; Offset = (string) record.Offset; DateTime.TryParse((string) record.EnqueuedTimeUtc, out var enqueuedTimeUtc); EnqueuedTimeUtc = enqueuedTimeUtc; SystemProperties = (Dictionary) record.SystemProperties; Properties = (Dictionary) record.Properties; Body = (byte[]) record.Body; } public long SequenceNumber { get; set; } public string Offset { get; set; } public DateTime EnqueuedTimeUtc { get; set; } public Dictionary SystemProperties { get; set; } public Dictionary Properties { get; set; } public byte[] Body { get; set; } } 
  • NuGet参考:

    • Microsoft.Hadoop.Avro2(1.2.1有效)
    • WindowsAzure.Storage(8.3.0有效)
  • 命名空间:

    • Microsoft.Hadoop.Avro.Container
    • Microsoft.WindowsAzure.Storage

我终于能够使用Apache C#库/框架了。
我被困了一段时间,因为Azure事件中心的Capturefunction有时会输出一个没有任何消息内容的文件。 我可能还有一个问题,如何将消息最初序列化为EventData对象。
下面的代码用于从捕获blob容器保存到磁盘的文件。

 var dataFileReader = DataFileReader.OpenReader(file); foreach (var record in dataFileReader.NextEntries) { // Do work on EventData object } 

这也可以使用GenericRecord对象。

 var dataFileReader = DataFileReader.OpenReader(file); 

这花了一些力气才弄明白。 但是我现在同意这个Azure事件中心捕获function是一个很好的function来备份所有事件。 我仍然觉得他们应该像使用Stream Analytic作业输出一样使格式可选,但也许我会习惯Avro。

我怀疑你剩下的类型应该被定义为:

 [DataContract(Namespace = "Microsoft.ServiceBus.Messaging")] [KnownType(typeof(Dictionary))] public class EventData { [DataMember] public IDictionary SystemProperties { get; set; } [DataMember] public IDictionary Properties { get; set; } [DataMember] public byte[] Body { get; set; } } 

即使Bodynullbytes的并集,它也会映射到nullable byte[]

在C#中,数组总是引用类型,因此可以为null并完成契约。