[SPRING] Spring @ KafkaListener는 일정한 간격 후에 레코드를 실행하고 폴링합니다.
SPRINGSpring @ KafkaListener는 일정한 간격 후에 레코드를 실행하고 폴링합니다.
일정한 간격 (예 : 5 분마다) 후에 레코드를 소비하려고했습니다. 소비자 자산은 표준입니다.
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(1);
factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(300000);
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.BATCH);
return factory;
}
비록 내가 속성을 변경할 때 PoolTimeout은 정의 된 간격 (5 분) 후에 폴링하지 않고 30 초 후에 계속 폴링하며 여기에 내 로그가 있습니다.
2018-01-23 18:07:26.875 INFO 60905 --- [ 2-0-C-1] c.t.k.s.consumer.FavoriteEventConsumer : Consumed: san@1516710960000->1516711080000 2
2018-01-23 18:07:56.901 INFO 60905 --- [ 2-0-C-1] c.t.k.s.consumer.FavoriteEventConsumer : Consumed: san@1516710960000->1516711080000 4
우리는 windowed aggregations을 가진 kafka stream application을 만들고 y interval 후에 window x을 소비하려고 계획하고있었습니다.
클래스에서 KafkaMessageListenerContainer, setConsumerTaskExecutor가 설정되었음을 알 수 있습니다.
if (containerProperties.getConsumerTaskExecutor() == null) {
SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor(
(getBeanName() == null ? "" : getBeanName()) + "-C-");
containerProperties.setConsumerTaskExecutor(consumerExecutor);
}
그러나이 (빈도) 스레드 풀이 레코드를 폴링 할 때 어떻게 구성해야합니까? 어떤 도움을 주셔서 감사합니다.
해결법
-
==============================
1.소비자가 투표하는 비율을 제어 할 수는 없습니다. pollTimeout은 poll ()이 새 레코드가 도착하기를 기다리는 시간입니다. 새로운 기록이 더 자주 도착하면 오래 기다리지 않을 것입니다.
소비자가 투표하는 비율을 제어 할 수는 없습니다. pollTimeout은 poll ()이 새 레코드가 도착하기를 기다리는 시간입니다. 새로운 기록이 더 자주 도착하면 오래 기다리지 않을 것입니다.
레코드를받는 속도를 제어하려면 DefatulKafkaConsumerFactory를 사용하여 소비자를 만들고 원하는 경우 언제든지 폴링하십시오.
그래도 @KafkaListener와 함께 사용할 수는 없습니다. 직접 기록을 처리해야합니다.
-
==============================
2.Spring @ KafkaListener를 사용하는 Kafka 소비자의 속도를 제어하려면 다음 방법으로 KafkaListenerEndpointRegistry bean 사용을 autowire하고 필요한 MessageListenerContainer에 액세스하십시오. 그런 다음 pause () 및 resume () 기능을 사용하여 필요한 동작을 제어 할 수 있습니다.
Spring @ KafkaListener를 사용하는 Kafka 소비자의 속도를 제어하려면 다음 방법으로 KafkaListenerEndpointRegistry bean 사용을 autowire하고 필요한 MessageListenerContainer에 액세스하십시오. 그런 다음 pause () 및 resume () 기능을 사용하여 필요한 동작을 제어 할 수 있습니다.
@Autowired private KafkaListenerEndpointRegistry listener; @Autowired private Map<String, Set<String>> getTopicListenerMap(){ List<String> ids = new ArrayList<>(listener.getListenerContainerIds()); Map<String, Set<String>> topicListenerMap = new HashMap<>(); for(String topic: topics){ topicListenerMap.put(topic, new HashSet<>()); } for(String key: ids){ for (String topic : listener.getListenerContainer(key).getContainerProperties().getTopics()){ topicListenerMap.get(topic).add(key); } } return topicListenerMap; } @KafkaListener(topics = "topic", containerFactory = "smsListener") public void listenWithHeaders(@Payload List<String> messageList, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitionList, @Header(KafkaHeaders.OFFSET) List<Integer> offsetList) { try{ LOG.info("Received message count: "+(messageList!=null ? messageList.size(): 0)+", offset start: "+offsetList.get(0)+", end: "+offsetList.get(offsetList.size()-1)); pauseIfRequired(topic); for(int i=0; i<messageList.size(); i++){ // process the messages } }catch (Exception e){ LOG.error("", e); }finally { resumeIfPaused(topic); } } private void pauseIfRequired(String topic){ try{ boolean flag = pausingCondition; if(flag){ LOG.info("pausing topic: "+topic); for(String listenerKey: getTopicListenerMap().get(topic)){ listener.getListenerContainer(listenerKey).pause(); } LOG.info("topic paused: "+topic); } } catch (Exception e){ LOG.error("", e); } } private void resumeIfPaused(String topic){ try { for (String listenerKey : getTopicListenerMap().get(topic)) { LOG.info("topic: "+topic+", containerPauseRequested: "+listener.getListenerContainer(listenerKey).isPauseRequested()); if (listener.getListenerContainer(listenerKey).isPauseRequested()) { LOG.info("waiting to resume topic: " + topic + ", listener key: " + listenerKey); // wait while the condition to resume is fulfilled LOG.info("resuming topic: " + topic + ", listener key: " + listenerKey); listener.getListenerContainer(listenerKey).resume(); LOG.info("topic resumed: " + topic + ", listener key: " + listenerKey); } } } catch (Exception e){ LOG.error("", e); } }
from https://stackoverflow.com/questions/48402355/spring-kafkalistener-execute-and-poll-records-after-certain-interval by cc-by-sa and MIT license
'SPRING' 카테고리의 다른 글
[SPRING] 스프링이있는 타일 : 오류 발생 - java.lang.ClassNotFoundException : org.apache.tiles.TilesApplicationContext (0) | 2019.04.20 |
---|---|
[SPRING] onSave () (Hibernate / Spring 데이터 저장소와 함께 저장된 엔티티) (0) | 2019.04.20 |
[SPRING] WebAppContext가 시작되지 않으면 시작 또는 종료 부두를 취소하는 방법 (0) | 2019.04.19 |
[SPRING] Eclipse에서 메모리 부족 오류 (0) | 2019.04.19 |
[SPRING] 컨텍스트 초기화에 실패했습니다. (0) | 2019.04.19 |