[HADOOP] 카프카는 연결 - HDFS와 ExtractTopic 변환 싱크 커넥터가 NullPointerException이 발생합니다
HADOOP카프카는 연결 - HDFS와 ExtractTopic 변환 싱크 커넥터가 NullPointerException이 발생합니다
I 합류 HDFS를 사용하고는 카프카 2.0.0과 커넥터 5.0.0 싱크 나는 ExtractTopic 변환 (https://docs.confluent.io/current/connect/transforms/extracttopic.html)를 사용해야합니다. 내 커넥터는 잘 작동하지만이 변환을 추가 할 때 난 단지 2 속성을 가진 간단한 데이터 샘플에 NullPointerException이 얻을.
ERROR Task hive-table-test-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:482)
java.lang.NullPointerException
at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:352)
at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:109)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:464)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:265)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:182)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:150)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
여기서 커넥터의 구성은 :
name=hive-table-test
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=hive_table_test
key.converter=io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=${env.SCHEMA_REGISTRY_URL}
value.converter.schema.registry.url=${env.SCHEMA_REGISTRY_URL}
schema.compatibility=BACKWARD
# HDFS configuration
# Use store.url instead of hdfs.url (deprecated) in later versions. Property store.url does not work, yet
hdfs.url=${env.HDFS_URL}
hadoop.conf.dir=/etc/hadoop/conf
hadoop.home=/opt/cloudera/parcels/CDH/lib/hadoop
topics.dir=${env.HDFS_TOPICS_DIR}
# Connector configuration
format.class=io.confluent.connect.hdfs.avro.AvroFormat
flush.size=100
rotate.interval.ms=60000
# Hive integration
hive.integration=true
hive.metastore.uris=${env.HIVE_METASTORE_URIS}
hive.conf.dir=/etc/hive/conf
hive.home=/opt/cloudera/parcels/CDH/lib/hive
hive.database=kafka_connect
# Transformations
transforms=InsertMetadata, ExtractTopic
transforms.InsertMetadata.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.InsertMetadata.partition.field=partition
transforms.InsertMetadata.offset.field=offset
transforms.ExtractTopic.type=io.confluent.connect.transforms.ExtractTopic$Value
transforms.ExtractTopic.field=name
transforms.ExtractTopic.skip.missing.or.null=true
내가 스키마 레지스트리를 사용하고, 데이터는 브로 형식으로 내가 지정된 속성 이름이 null이 확신합니다. 어떤 제안? 내가 필요한 것은 주어진 필드의 내용을 추출 및 주제의 이름으로 사용하기 위해 기본적으로.
편집하다:
심지어 브로 형식이 같은 간단한 JSON에서 발생합니다 :
{
"attr": "tmp",
"name": "topic1"
}
해결법
-
==============================
1.당신이 당신의 변환에 항목의 이름을 변경하기 때문에 짧은 대답이다.
당신이 당신의 변환에 항목의 이름을 변경하기 때문에 짧은 대답이다.
각 주제 파티션 HDFS 커넥터는 별도의 TopicPartitionWriter있다. 메시지 처리에 대한 책임이 SinkTask 각 파티션 TopicPartitionWriter 오픈 (...) 방법으로 생성 될 때 생성된다.
이 SinkRecords을 처리 할 때, 그것은 TopicPartitionWriter에 대한 조회 및 버퍼 레코드를 추가하려고 주제 이름과 파티션 수에 따라. 귀하의 경우이 메시지에 대한 기록을 찾을 수 없습니다. 주제 이름을 변환하여, 그 쌍 (항목, 파티션) 상관 TopicPartitionWriter이 생성되지 대해 변경되었다.
HdfsSinkTask에 전달되는 SinkRecords :: 넣어 (컬렉션
기록), 어떤 변환을 적용 할 필요가 없습니다 파티션과 주제는 이미 설정했습니다. 나는 io.confluent.connect.transforms.ExtractTopic 오히려 SourceConnector에 사용한다고 생각합니다.
from https://stackoverflow.com/questions/54367856/kafka-connect-extracttopic-transformation-with-hdfs-sink-connector-throws-null by cc-by-sa and MIT license
'HADOOP' 카테고리의 다른 글
[HADOOP] HBase를 회복 (0) | 2019.10.07 |
---|---|
[HADOOP] 분산 캐시를 통해 매퍼에서 파일을 액세스 (0) | 2019.10.06 |
[HADOOP] 스파크 스트리밍 빈 파티션에 대한 쓰기 파일을 피하십시오 (0) | 2019.10.06 |
[HADOOP] HDFS -du 결과를 이해하는 방법 (0) | 2019.10.06 |
[HADOOP] 입력으로 이미지와 맵리 듀스 작업을 만듭니다 (0) | 2019.10.06 |