복붙노트

[SPRING] Spring @ KafkaListener는 일정한 간격 후에 레코드를 실행하고 폴링합니다.

SPRING

Spring @ KafkaListener는 일정한 간격 후에 레코드를 실행하고 폴링합니다.

일정한 간격 (예 : 5 분마다) 후에 레코드를 소비하려고했습니다. 소비자 자산은 표준입니다.

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(1);
    factory.setBatchListener(true);
    factory.getContainerProperties().setPollTimeout(300000);
    factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.BATCH);
    return factory;
}

비록 내가 속성을 변경할 때 PoolTimeout은 정의 된 간격 (5 분) 후에 폴링하지 않고 30 초 후에 계속 폴링하며 여기에 내 로그가 있습니다.

2018-01-23 18:07:26.875 INFO 60905 --- [        2-0-C-1] c.t.k.s.consumer.FavoriteEventConsumer   : Consumed: san@1516710960000->1516711080000 2

2018-01-23 18:07:56.901 INFO 60905 --- [        2-0-C-1] c.t.k.s.consumer.FavoriteEventConsumer   : Consumed: san@1516710960000->1516711080000 4

우리는 windowed aggregations을 가진 kafka stream application을 만들고 y interval 후에 window x을 소비하려고 계획하고있었습니다.

클래스에서 KafkaMessageListenerContainer, setConsumerTaskExecutor가 설정되었음을 알 수 있습니다.

if (containerProperties.getConsumerTaskExecutor() == null) {
        SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor(
                (getBeanName() == null ? "" : getBeanName()) + "-C-");
        containerProperties.setConsumerTaskExecutor(consumerExecutor);
    }

그러나이 (빈도) 스레드 풀이 레코드를 폴링 할 때 어떻게 구성해야합니까? 어떤 도움을 주셔서 감사합니다.

해결법

  1. ==============================

    1.소비자가 투표하는 비율을 제어 할 수는 없습니다. pollTimeout은 poll ()이 새 레코드가 도착하기를 기다리는 시간입니다. 새로운 기록이 더 자주 도착하면 오래 기다리지 않을 것입니다.

    소비자가 투표하는 비율을 제어 할 수는 없습니다. pollTimeout은 poll ()이 새 레코드가 도착하기를 기다리는 시간입니다. 새로운 기록이 더 자주 도착하면 오래 기다리지 않을 것입니다.

    레코드를받는 속도를 제어하려면 DefatulKafkaConsumerFactory를 사용하여 소비자를 만들고 원하는 경우 언제든지 폴링하십시오.

    그래도 @KafkaListener와 함께 사용할 수는 없습니다. 직접 기록을 처리해야합니다.

  2. ==============================

    2.Spring @ KafkaListener를 사용하는 Kafka 소비자의 속도를 제어하려면 다음 방법으로 KafkaListenerEndpointRegistry bean 사용을 autowire하고 필요한 MessageListenerContainer에 액세스하십시오. 그런 다음 pause () 및 resume () 기능을 사용하여 필요한 동작을 제어 할 수 있습니다.

    Spring @ KafkaListener를 사용하는 Kafka 소비자의 속도를 제어하려면 다음 방법으로 KafkaListenerEndpointRegistry bean 사용을 autowire하고 필요한 MessageListenerContainer에 액세스하십시오. 그런 다음 pause () 및 resume () 기능을 사용하여 필요한 동작을 제어 할 수 있습니다.

    @Autowired
    private KafkaListenerEndpointRegistry listener;
    
    @Autowired
    private Map<String, Set<String>> getTopicListenerMap(){
        List<String> ids = new ArrayList<>(listener.getListenerContainerIds());
        Map<String, Set<String>> topicListenerMap = new HashMap<>();
        for(String topic: topics){
            topicListenerMap.put(topic, new HashSet<>());
        }
        for(String key: ids){
            for (String topic : listener.getListenerContainer(key).getContainerProperties().getTopics()){
                topicListenerMap.get(topic).add(key);
            }
        }
        return topicListenerMap;
    }
    
    @KafkaListener(topics = "topic", containerFactory = "smsListener")
    public void listenWithHeaders(@Payload List<String> messageList, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitionList,
                                  @Header(KafkaHeaders.OFFSET) List<Integer> offsetList) {
        try{
            LOG.info("Received message count: "+(messageList!=null ? messageList.size(): 0)+", offset start: "+offsetList.get(0)+", end: "+offsetList.get(offsetList.size()-1));
            pauseIfRequired(topic);
            for(int i=0; i<messageList.size(); i++){
                // process the messages
            }
        }catch (Exception e){
            LOG.error("", e);
        }finally {
            resumeIfPaused(topic);
        }
    }
    
    private void pauseIfRequired(String topic){
        try{ 
            boolean flag = pausingCondition;
            if(flag){
                LOG.info("pausing topic: "+topic);
                for(String listenerKey: getTopicListenerMap().get(topic)){
                    listener.getListenerContainer(listenerKey).pause();
                }
                LOG.info("topic paused: "+topic);
            }
        } catch (Exception e){
            LOG.error("", e);
        }
    }
    
    private void resumeIfPaused(String topic){
        try {
            for (String listenerKey : getTopicListenerMap().get(topic)) {
                LOG.info("topic: "+topic+", containerPauseRequested: "+listener.getListenerContainer(listenerKey).isPauseRequested());
                if (listener.getListenerContainer(listenerKey).isPauseRequested()) {
                    LOG.info("waiting to resume topic: " + topic + ", listener key: " + listenerKey);
                    // wait while the condition to resume is fulfilled
                    LOG.info("resuming topic: " + topic + ", listener key: " + listenerKey);
                    listener.getListenerContainer(listenerKey).resume();
                    LOG.info("topic resumed: " + topic + ", listener key: " + listenerKey);
                }
            }
        } catch (Exception e){
            LOG.error("", e);
        }
    }
    
  3. from https://stackoverflow.com/questions/48402355/spring-kafkalistener-execute-and-poll-records-after-certain-interval by cc-by-sa and MIT license