[SPRING] 카프카에서 여러 번 같은 메시지 읽기
SPRING카프카에서 여러 번 같은 메시지 읽기
나는 Spring Kafka API를 사용하여 Kafka 소비자에게 수동 오프셋 관리를 구현합니다.
@KafkaListener(topics = "some_topic")
public void onMessage(@Payload Message message, Acknowledgment acknowledgment) {
if (someCondition) {
acknowledgment.acknowledge();
}
}
여기서는 someCondition이 보유한 경우에만 소비자가 오프셋을 적용하기를 원합니다. 그렇지 않으면 소비자는 잠시 동안 잠자기하고 같은 메시지를 다시 읽어야합니다.
카프카 구성 :
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfig());
factory.getContainerProperties().setAckMode(MANUAL);
return factory;
}
private Map<String, Object> consumerConfig() {
Map<String, Object> props = new HashMap<>();
...
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
...
return props;
}
현재 구성에서 someCondition == false이면 소비자는 오프셋을 커밋하지 않지만 여전히 다음 메시지를 읽습니다. 카프카 (Kafka)의 승인을받지 못하면 소비자가 메시지를 다시 읽게하는 방법이 있습니까?
해결법
-
==============================
1.컨테이너를 중지했다가 다시 시작하면 컨테이너가 다시 전송됩니다.
컨테이너를 중지했다가 다시 시작하면 컨테이너가 다시 전송됩니다.
곧 출시 될 1.1 버전에서는 필요한 오프셋을 찾아 재전송 할 수 있습니다.
그러나 이미 검색된 메일은 나중에 표시되므로 나중에 메일을 삭제해야합니다.
두 번째 이정표는 그 기능을 가지고 있으며 다음 주에 발표 될 것으로 기대합니다.
-
==============================
2.@Gary가 이미 지적했듯이, 올바른 방향으로 가고 있습니다. seek ()이 그것을 수행하는 방법입니다. 이 문제에 직면했을 때 저는 오늘 그 코드 예제를 찾을 수 없었습니다. 문제 해결을 원하는 사람을위한 코드는 다음과 같습니다.
@Gary가 이미 지적했듯이, 올바른 방향으로 가고 있습니다. seek ()이 그것을 수행하는 방법입니다. 이 문제에 직면했을 때 저는 오늘 그 코드 예제를 찾을 수 없었습니다. 문제 해결을 원하는 사람을위한 코드는 다음과 같습니다.
public class Receiver implements AcknowledgingMessageListener<Integer, String>, ConsumerSeekAware { private ConsumerSeekCallback consumerSeekCallback; @Override public void onMessage(ConsumerRecord<Integer, String> record, Acknowledgment acknowledgment) { if (/*some condition*/) { //process acknowledgment.acknowledge(); //send ack } else { consumerSeekCallback.seek("your.topic", record.partition(), record.offset()); } } @Override public void registerSeekCallback(ConsumerSeekCallback consumerSeekCallback) { this.consumerSeekCallback = consumerSeekCallback; } @Override public void onPartitionsAssigned(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) { // nothing is needed here for this program } @Override public void onIdleContainer(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) { // nothing is needed here for this program } }
from https://stackoverflow.com/questions/39536012/reading-the-same-message-several-times-from-kafka by cc-by-sa and MIT license
'SPRING' 카테고리의 다른 글
[SPRING] sql prepared statement, spring, SimpleJDBCTemplate에서 LIKE 절 사용 (0) | 2019.05.14 |
---|---|
[SPRING] Spring : @Aspect 주석으로 주석 된 클래스에 모의 삽입을 할 수 없다. (0) | 2019.05.14 |
[SPRING] @ConfigurationProperties를 @Configuration에 autowire하는 방법은 무엇입니까? (0) | 2019.05.14 |
[SPRING] JUnit 테스트 케이스는 eclipse와 함께 전달되지만 maven 빌드에서는 실패합니다. (0) | 2019.05.14 |
[SPRING] Spring의 데이터베이스 연결 관리 (0) | 2019.05.14 |