복붙노트

[HADOOP] C #으로 Avro 파일을 비 직렬화

HADOOP

C #으로 Avro 파일을 비 직렬화

C #으로 Apache Avro 파일을 비 직렬화하는 방법을 찾을 수 없습니다. Avro 파일은 Microsoft Azure 이벤트 허브의 보관 기능에 의해 생성 된 파일입니다.

Java를 사용하면 Apache에서 Avro Tools를 사용하여 파일을 JSON으로 변환 할 수 있습니다.

java -jar avro-tools-1.8.1.jar tojson --pretty inputfile > output.json

NuGet 패키지 Microsoft.Hadoop.Avro를 사용하여 SequenceNumber, Offset 및 EnqueuedTimeUtc를 추출 할 수 있지만 Body에 사용할 형식을 알 수 없으므로 예외가 throw됩니다. Dictionary 및 다른 유형으로 시도했습니다.

static void Main(string[] args)
{
    var fileName = "...";

    using (Stream stream = new FileStream(fileName, FileMode.Open, FileAccess.Read, FileShare.Read))
    {
        using (var reader = AvroContainer.CreateReader<EventData>(stream))
        {
            using (var streamReader = new SequentialReader<EventData>(reader))
            {
                var record = streamReader.Objects.FirstOrDefault();
            }
        }
    }
}

[DataContract(Namespace = "Microsoft.ServiceBus.Messaging")]
public class EventData
{
    [DataMember(Name = "SequenceNumber")]
    public long SequenceNumber { get; set; }

    [DataMember(Name = "Offset")]
    public string Offset { get; set; }

    [DataMember(Name = "EnqueuedTimeUtc")]
    public string EnqueuedTimeUtc { get; set; }

    [DataMember(Name = "Body")]
    public foo Body { get; set; }

    // More properties...
}

스키마는 다음과 같습니다.

{
  "type": "record",
  "name": "EventData",
  "namespace": "Microsoft.ServiceBus.Messaging",
  "fields": [
    {
      "name": "SequenceNumber",
      "type": "long"
    },
    {
      "name": "Offset",
      "type": "string"
    },
    {
      "name": "EnqueuedTimeUtc",
      "type": "string"
    },
    {
      "name": "SystemProperties",
      "type": {
        "type": "map",
        "values": [ "long", "double", "string", "bytes" ]
      }
    },
    {
      "name": "Properties",
      "type": {
        "type": "map",
        "values": [ "long", "double", "string", "bytes" ]
      }
    },
    {
      "name": "Body",
      "type": [ "null", "bytes" ]
    }
  ]
}    

해결법

  1. ==============================

    1.동적 인 기능을 사용하여 전체 데이터 액세스를 수행 할 수있었습니다. 다음은 바이트의 배열로 저장되는 원시 본문 데이터에 액세스하는 코드입니다. 필자의 경우, 해당 바이트에는 UTF8로 인코딩 된 JSON이 포함되어 있지만, 당연히 Event Hub에 게시 한 EventData 인스턴스를 처음 생성 한 방법에 따라 다릅니다.

    동적 인 기능을 사용하여 전체 데이터 액세스를 수행 할 수있었습니다. 다음은 바이트의 배열로 저장되는 원시 본문 데이터에 액세스하는 코드입니다. 필자의 경우, 해당 바이트에는 UTF8로 인코딩 된 JSON이 포함되어 있지만, 당연히 Event Hub에 게시 한 EventData 인스턴스를 처음 생성 한 방법에 따라 다릅니다.

    using (var reader = AvroContainer.CreateGenericReader(stream))
    {
        while (reader.MoveNext())
        {
            foreach (dynamic record in reader.Current.Objects)
            {
                var sequenceNumber = record.SequenceNumber;
                var bodyText = Encoding.UTF8.GetString(record.Body);
                Console.WriteLine($"{sequenceNumber}: {bodyText}");
            }
        }
    }
    

    누군가 정적으로 타이핑 한 솔루션을 게시 할 수 있다면, 나는 그것을 업 보봇 (upvote) 할 것이지만, 어떤 시스템에서 더 큰 대기 시간이 Event Hub Archive blob에 대한 연결이 될 것이라는 점을 감안할 때, 퍼포먼스 분석에 대해서는 걱정할 필요가 없다. :)

  2. ==============================

    2.이 요령은 .NET Framework 4.5 및 .NET Standard 1.6을 준수한다는 장점이있는 Microsoft.Hadoop.Avro2를 사용하여 C #으로 이벤트 허브 캡처를 deserialize하는 방법을 보여줍니다.

    이 요령은 .NET Framework 4.5 및 .NET Standard 1.6을 준수한다는 장점이있는 Microsoft.Hadoop.Avro2를 사용하여 C #으로 이벤트 허브 캡처를 deserialize하는 방법을 보여줍니다.

     var connectionString = "<Azure event hub capture storage account connection string>";
     var containerName = "<Azure event hub capture container name>";
     var blobName = "<Azure event hub capture BLOB name (ends in .avro)>";
    
     var storageAccount = CloudStorageAccount.Parse(connectionString);
     var blobClient = storageAccount.CreateCloudBlobClient();
     var container = blobClient.GetContainerReference(containerName);
     var blob = container.GetBlockBlobReference(blobName);
     using (var stream = blob.OpenRead())
     using (var reader = AvroContainer.CreateGenericReader(stream))
         while (reader.MoveNext())
             foreach (dynamic result in reader.Current.Objects)
             {
                 var record = new AvroEventData(result);
                 record.Dump();
             }
    
     public struct AvroEventData
     {
         public AvroEventData(dynamic record)
         {
             SequenceNumber = (long) record.SequenceNumber;
             Offset = (string) record.Offset;
             DateTime.TryParse((string) record.EnqueuedTimeUtc, out var enqueuedTimeUtc);
             EnqueuedTimeUtc = enqueuedTimeUtc;
             SystemProperties = (Dictionary<string, object>) record.SystemProperties;
             Properties = (Dictionary<string, object>) record.Properties;
             Body = (byte[]) record.Body;
         }
         public long SequenceNumber { get; set; }
         public string Offset { get; set; }
         public DateTime EnqueuedTimeUtc { get; set; }
         public Dictionary<string, object> SystemProperties { get; set; }
         public Dictionary<string, object> Properties { get; set; }
         public byte[] Body { get; set; }
     }
    
  3. ==============================

    3.마침내 아파치 C # 라이브러리 / 프레임 워크를 사용할 수있게되었습니다. Azure Event Hub의 캡처 기능이 메시지 내용없이 파일을 출력하기 때문에 잠시 멈추었습니다. 메시지가 원래 EventData 개체에 직렬화 된 방식에 문제가있을 수도 있습니다. 아래의 코드는 캡처 블롭 컨테이너에서 디스크에 저장된 파일을위한 것입니다.

    마침내 아파치 C # 라이브러리 / 프레임 워크를 사용할 수있게되었습니다. Azure Event Hub의 캡처 기능이 메시지 내용없이 파일을 출력하기 때문에 잠시 멈추었습니다. 메시지가 원래 EventData 개체에 직렬화 된 방식에 문제가있을 수도 있습니다. 아래의 코드는 캡처 블롭 컨테이너에서 디스크에 저장된 파일을위한 것입니다.

    var dataFileReader = DataFileReader<EventData>.OpenReader(file);
    foreach (var record in dataFileReader.NextEntries)
    {
       // Do work on EventData object
    }
    

    이것은 또한 GenericRecord 객체를 사용하여 작동합니다.

    var dataFileReader = DataFileReader<GenericRecord>.OpenReader(file);
    

    이것은 알아 내려고 노력했습니다. 그러나 나는 이제 Azure Event Hubs Capture 기능이 모든 이벤트를 백업하는 훌륭한 기능임을 동의합니다. 여전히 Stream Analytic 작업 출력에서와 같이 형식을 선택적으로 설정해야한다고 생각하지만 아마도 Avro에 익숙해 질 것입니다.

  4. ==============================

    4.나머지 유형은 다음과 같이 정의해야합니다.

    나머지 유형은 다음과 같이 정의해야합니다.

    [DataContract(Namespace = "Microsoft.ServiceBus.Messaging")]
    [KnownType(typeof(Dictionary<string, object>))]
    public class EventData
    {
        [DataMember]
        public IDictionary<string, object> SystemProperties { get; set; }
    
        [DataMember]
        public IDictionary<string, object> Properties { get; set; }
    
        [DataMember]
        public byte[] Body { get; set; }
    }
    

    Body가 null과 바이트의 결합체이더라도 nullable byte []로 매핑됩니다.

    C #에서는 배열이 항상 참조 유형이므로 null이 될 수 있고 계약이 성립됩니다.

  5. ==============================

    5.또한 NullableSchema 특성을 사용하여 본문을 바이트와 null의 합집합으로 표시 할 수 있습니다. 이렇게하면 강력하게 형식화 된 인터페이스를 사용할 수 있습니다.

    또한 NullableSchema 특성을 사용하여 본문을 바이트와 null의 합집합으로 표시 할 수 있습니다. 이렇게하면 강력하게 형식화 된 인터페이스를 사용할 수 있습니다.

    [DataContract(Namespace = "Microsoft.ServiceBus.Messaging")]
    public class EventData
    {
        [DataMember(Name = "SequenceNumber")]
        public long SequenceNumber { get; set; }
    
        [DataMember(Name = "Offset")]
        public string Offset { get; set; }
    
        [DataMember(Name = "EnqueuedTimeUtc")]
        public string EnqueuedTimeUtc { get; set; }
    
        [DataMember(Name = "Body")]
        [NullableSchema]
        public foo Body { get; set; }
    }
    
  6. from https://stackoverflow.com/questions/39846833/deserialize-an-avro-file-with-c-sharp by cc-by-sa and MIT license