복붙노트

[HADOOP] 카프카는 연결 - HDFS와 ExtractTopic 변환 싱크 커넥터가 NullPointerException이 발생합니다

HADOOP

카프카는 연결 - HDFS와 ExtractTopic 변환 싱크 커넥터가 NullPointerException이 발생합니다

I 합류 HDFS를 사용하고는 카프카 2.0.0과 커넥터 5.0.0 싱크 나는 ExtractTopic 변환 (https://docs.confluent.io/current/connect/transforms/extracttopic.html)를 사용해야합니다. 내 커넥터는 잘 작동하지만이 변환을 추가 할 때 난 단지 2 속성을 가진 간단한 데이터 샘플에 NullPointerException이 얻을.

ERROR Task hive-table-test-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:482)
java.lang.NullPointerException
    at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:352)
    at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:109)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:464)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:265)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:182)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:150)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748) 

여기서 커넥터의 구성은 :

name=hive-table-test
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=hive_table_test

key.converter=io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=${env.SCHEMA_REGISTRY_URL}
value.converter.schema.registry.url=${env.SCHEMA_REGISTRY_URL}
schema.compatibility=BACKWARD

# HDFS configuration
# Use store.url instead of hdfs.url (deprecated) in later versions. Property store.url does not work, yet
hdfs.url=${env.HDFS_URL}
hadoop.conf.dir=/etc/hadoop/conf
hadoop.home=/opt/cloudera/parcels/CDH/lib/hadoop
topics.dir=${env.HDFS_TOPICS_DIR}

# Connector configuration
format.class=io.confluent.connect.hdfs.avro.AvroFormat
flush.size=100
rotate.interval.ms=60000

# Hive integration
hive.integration=true
hive.metastore.uris=${env.HIVE_METASTORE_URIS}
hive.conf.dir=/etc/hive/conf
hive.home=/opt/cloudera/parcels/CDH/lib/hive
hive.database=kafka_connect

# Transformations
transforms=InsertMetadata, ExtractTopic
transforms.InsertMetadata.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.InsertMetadata.partition.field=partition
transforms.InsertMetadata.offset.field=offset

transforms.ExtractTopic.type=io.confluent.connect.transforms.ExtractTopic$Value
transforms.ExtractTopic.field=name
transforms.ExtractTopic.skip.missing.or.null=true

내가 스키마 레지스트리를 사용하고, 데이터는 브로 형식으로 내가 지정된 속성 이름이 null이 확신합니다. 어떤 제안? 내가 필요한 것은 주어진 필드의 내용을 추출 및 주제의 이름으로 사용하기 위해 기본적으로.

편집하다:

심지어 브로 형식이 같은 간단한 JSON에서 발생합니다 :

{
   "attr": "tmp",
   "name": "topic1"
}

해결법

  1. ==============================

    1.당신이 당신의 변환에 항목의 이름을 변경하기 때문에 짧은 대답이다.

    당신이 당신의 변환에 항목의 이름을 변경하기 때문에 짧은 대답이다.

    각 주제 파티션 HDFS 커넥터는 별도의 TopicPartitionWriter있다. 메시지 처리에 대한 책임이 SinkTask 각 파티션 TopicPartitionWriter 오픈 (...) 방법으로 생성 될 때 생성된다.

    이 SinkRecords을 처리 할 때, 그것은 TopicPartitionWriter에 대한 조회 및 버퍼 레코드를 추가하려고 주제 이름과 파티션 수에 따라. 귀하의 경우이 메시지에 대한 기록을 찾을 수 없습니다. 주제 이름을 변환하여, 그 쌍 (항목, 파티션) 상관 TopicPartitionWriter이 생성되지 대해 변경되었다.

    HdfsSinkTask에 전달되는 SinkRecords :: 넣어 (컬렉션 기록), 어떤 변환을 적용 할 필요가 없습니다 파티션과 주제는 이미 설정했습니다.

    나는 io.confluent.connect.transforms.ExtractTopic 오히려 SourceConnector에 사용한다고 생각합니다.

  2. from https://stackoverflow.com/questions/54367856/kafka-connect-extracttopic-transformation-with-hdfs-sink-connector-throws-null by cc-by-sa and MIT license