복붙노트

[SPRING] 카프카에서 여러 번 같은 메시지 읽기

SPRING

카프카에서 여러 번 같은 메시지 읽기

나는 Spring Kafka API를 사용하여 Kafka 소비자에게 수동 오프셋 관리를 구현합니다.

@KafkaListener(topics = "some_topic")
public void onMessage(@Payload Message message, Acknowledgment acknowledgment) {
    if (someCondition) {
        acknowledgment.acknowledge();
    }
}

여기서는 someCondition이 보유한 경우에만 소비자가 오프셋을 적용하기를 원합니다. 그렇지 않으면 소비자는 잠시 동안 잠자기하고 같은 메시지를 다시 읽어야합니다.

카프카 구성 :

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfig());
    factory.getContainerProperties().setAckMode(MANUAL);
    return factory;
}

private Map<String, Object> consumerConfig() {
    Map<String, Object> props = new HashMap<>();
    ...
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    ...
    return props;
}

현재 구성에서 someCondition == false이면 소비자는 오프셋을 커밋하지 않지만 여전히 다음 메시지를 읽습니다. 카프카 (Kafka)의 승인을받지 못하면 소비자가 메시지를 다시 읽게하는 방법이 있습니까?

해결법

  1. ==============================

    1.컨테이너를 중지했다가 다시 시작하면 컨테이너가 다시 전송됩니다.

    컨테이너를 중지했다가 다시 시작하면 컨테이너가 다시 전송됩니다.

    곧 출시 될 1.1 버전에서는 필요한 오프셋을 찾아 재전송 할 수 있습니다.

    그러나 이미 검색된 메일은 나중에 표시되므로 나중에 메일을 삭제해야합니다.

    두 번째 이정표는 그 기능을 가지고 있으며 다음 주에 발표 될 것으로 기대합니다.

  2. ==============================

    2.@Gary가 이미 지적했듯이, 올바른 방향으로 가고 있습니다. seek ()이 그것을 수행하는 방법입니다. 이 문제에 직면했을 때 저는 오늘 그 코드 예제를 찾을 수 없었습니다. 문제 해결을 원하는 사람을위한 코드는 다음과 같습니다.

    @Gary가 이미 지적했듯이, 올바른 방향으로 가고 있습니다. seek ()이 그것을 수행하는 방법입니다. 이 문제에 직면했을 때 저는 오늘 그 코드 예제를 찾을 수 없었습니다. 문제 해결을 원하는 사람을위한 코드는 다음과 같습니다.

    public class Receiver implements AcknowledgingMessageListener<Integer, String>, ConsumerSeekAware {
    
        private ConsumerSeekCallback consumerSeekCallback;
    
    
        @Override
        public void onMessage(ConsumerRecord<Integer, String> record, Acknowledgment acknowledgment) {
    
            if (/*some condition*/) {
                //process
                acknowledgment.acknowledge(); //send ack
            } else {
    
                consumerSeekCallback.seek("your.topic", record.partition(), record.offset());
    
            }
        }
    
        @Override
        public void registerSeekCallback(ConsumerSeekCallback consumerSeekCallback) {
            this.consumerSeekCallback = consumerSeekCallback;
        }
    
        @Override
        public void onPartitionsAssigned(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {
    
            // nothing is needed here for this program
        }
    
        @Override
        public void onIdleContainer(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {
    
            // nothing is needed here for this program
        }
    
    }
    
  3. from https://stackoverflow.com/questions/39536012/reading-the-same-message-several-times-from-kafka by cc-by-sa and MIT license