복붙노트

[SPRING] 카프카 스프링 통합 : 카프카 소비자에게 헤더가 오지 않는다.

SPRING

카프카 스프링 통합 : 카프카 소비자에게 헤더가 오지 않는다.

kafka를 사용하여 메시지를 게시하고 소비하는 데 Kafka Spring Integration을 사용하고 있습니다. 페이로드가 제작자에서 소비자로 제대로 전달되는 것을 볼 수 있지만 헤더 정보가 어딘가에서 오버라이드되고 있습니다.

@ServiceActivator(inputChannel = "fromKafka")
public void processMessage(Message<?> message) throws InterruptedException,
        ExecutionException {
    try {
            System.out.println("Headers :" + message.getHeaders().toString());
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
}

다음 헤더를 얻습니다.

Headers :{timestamp=1440013920609, id=f8c645f7-677b-ec32-dad0-a7b79082ef81}

다음과 같이 제작자 측에서 메시지를 생성합니다.

Message<FeelDBMessage> message = MessageBuilder
                .withPayload(samplePayloadObj)
                .setHeader(KafkaHeaders.MESSAGE_KEY, "key")
                .setHeader(KafkaHeaders.TOPIC, "sampleTopic").build();

        // publish the message
        publisher.publishMessage(message);

아래는 제작자의 헤더 정보입니다.

 headers={timestamp=1440013914085, id=c4159c1c-2c67-634b-ef8d-3fb026b1172e, kafka_messageKey=key, kafka_topic=sampleTopic}

헤더가 다른 값으로 대체되는 이유는 무엇입니까?

해결법

  1. ==============================

    1.그냥 기본적으로 프레임 워크는 불변의 GenericMessage를 사용하기 때문입니다.

    그냥 기본적으로 프레임 워크는 불변의 GenericMessage를 사용하기 때문입니다.

    기존 메시지 (예 : MessageBuilder.withPayload)를 조작하면 새로운 GenericMessage 인스턴스가 생성됩니다.

    다른면에서 Kafka는 JMS 또는 AMQP와 같은 헤더 추상화를 지원하지 않습니다. KafkaProducerMessageHandler가 Kafka에 메시지를 게시 할 때이 작업을 수행하는 이유는 다음과 같습니다.

    this.kafkaProducerContext.send(topic, partitionId, messageKey, message.getPayload());
    

    보시다시피 헤더를 보내지 않습니다. 따라서 다른 쪽 (소비자)은 화제의 메시지 만 페이로드로 처리하고 일부 시스템 옵션은 topic, partition, messageKey와 같은 헤더로 처리합니다.

    두 단어로, 카프카가 헤더를 지원하지 않기 때문에 카프카에 헤더를 전송하지 않습니다.

  2. from https://stackoverflow.com/questions/32104810/kafka-spring-integration-headers-not-coming-for-kafka-consumer by cc-by-sa and MIT license