복붙노트

[HADOOP] 시간에 따른 버킷 레코드 (kafka-hdfs-connector)

HADOOP

시간에 따른 버킷 레코드 (kafka-hdfs-connector)

Confluent 플랫폼에서 제공하는 kafka-hdfs-connector를 사용하여 Kafka에서 Hive 테이블로 데이터를 복사하려고합니다. 성공적으로 수행 할 수 있지만 시간 간격을 기준으로 들어오는 데이터를 버킷 화하는 방법이 궁금합니다. 예를 들어 5 분마다 새 파티션을 만들고 싶습니다.

partition.duration.ms로 io.confluent.connect.hdfs.partitioner.TimeBasedPartitioner를 사용해 보았지만 잘못된 방법으로 생각합니다. Hive 테이블에는 모든 데이터가 특정 파티션으로 이동하는 파티션이 하나만 있습니다. 이 같은 :

hive> show partitions test;
OK
partition
year=2016/month=03/day=15/hour=19/minute=03

그리고 모든 avro 객체가이 파티션으로 복사됩니다.

대신 다음과 같은 것을 갖고 싶습니다.

hive> show partitions test;
OK
partition
year=2016/month=03/day=15/hour=19/minute=03
year=2016/month=03/day=15/hour=19/minute=08
year=2016/month=03/day=15/hour=19/minute=13

처음에 커넥터는 year = 2016 / month = 03 / day = 15 / hour = 19 / minute = 03 경로를 작성하고 다음 5 분 동안 그리고 6 분 시작시 모든 수신 데이터를이 디렉토리에 계속 복사합니다. 새 경로 (예 : year = 2016 / month = 03 / day = 15 / hour = 19 / minute = 08)를 만들고 다음 5 분 동안의 데이터를이 디렉토리에 복사해야합니다.

내 구성 파일은 다음과 같습니다.

name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=test
hdfs.url=hdfs://localhost:9000
flush.size=3
partitioner.class=io.confluent.connect.hdfs.partitioner.TimeBasedPartitioner
partition.duration.ms=300000
path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH/'minute'=MM/
locale=en
timezone=GMT
logs.dir=/kafka-connect/logs
topics.dir=/kafka-connect/topics
hive.integration=true
hive.metastore.uris=thrift://localhost:9083
schema.compatibility=BACKWARD

누군가가 올바른 방향으로 나를 가리킬 수 있다면 정말 도움이 될 것입니다. 필요한 경우 자세한 내용을 공유하게되어 기쁩니다. 이 질문을 결코 끝나지 않는 것처럼 보이게하고 싶지 않습니다.

많은 감사합니다!

해결법

  1. ==============================

    1.path.format의 분 필드가 잘못되었습니다.

    path.format의 분 필드가 잘못되었습니다.

    path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH/'minute'=MM/
    

    그것은해야한다:

    path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH/'minute'=mm/
    
  2. from https://stackoverflow.com/questions/36036507/bucket-records-based-on-timekafka-hdfs-connector by cc-by-sa and MIT license