복붙노트

[SPRING] 여러 KafkaListenerContainerFactory 추가 문제

SPRING

여러 KafkaListenerContainerFactory 추가 문제

안녕하세요. 저는 현재 Spring Kafka에서 협연 중이며 단일 KafkaListenerContainerFactory를 리스너에 추가하는 데 성공했습니다. 이제 여러 개의 KafkaListenerContainerFactorys를 추가하고 싶습니다. (하나는 json에 메시지가 있고 다른 하나는 문자열에 대한 것입니다.) 아래 코드를 참조하십시오.

@EnableKafka
@Configuration
public class KafkaConsumersConfig {

    private final KafkaConfiguration kafkaConfiguration;

    @Autowired
    public KafkaConsumersConfig(KafkaConfiguration kafkaConfiguration) {
        this.kafkaConfiguration = kafkaConfiguration;
    }

    @Bean
    public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory(){
        ConcurrentKafkaListenerContainerFactory<String,Record> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(jsonConsumerFactory());
        factory.setConcurrency(3);
        factory.setAutoStartup(true);
        return factory;
    }

    @Bean
    public ConsumerFactory<String,Record> jsonConsumerFactory(){
        JsonDeserializer<Record> jsonDeserializer = new JsonDeserializer<>(Record.class);
        return new DefaultKafkaConsumerFactory<>(jsonConsumerConfigs(),new StringDeserializer(), jsonDeserializer);
    }

    @Bean
    public Map<String,Object> jsonConsumerConfigs(){
        Map<String,Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,  kafkaConfiguration.getBrokerAddress());
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfiguration.getJsonGroupId());
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaConfiguration.getAutoCommit());
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaConfiguration.getAutoCommitInterval());
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaConfiguration.getSessionTimeout());
        return propsMap;
    }
    @Bean
    public KafkaListenerContainerFactory<?> kafkaFileListenerContainerFactory(){
        ConcurrentKafkaListenerContainerFactory<String,String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(fileConsumerFactory());
        factory.setConcurrency(3);
        factory.setAutoStartup(true);
        return factory;
    }

    @Bean
    public ConsumerFactory<String,String> fileConsumerFactory(){
        return new DefaultKafkaConsumerFactory<>(fileConsumerConfigs());
    }

    @Bean
    public Map<String,Object> fileConsumerConfigs(){
        Map<String,Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,  kafkaConfiguration.getBrokerAddress());
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfiguration.getFileGroupId());
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaConfiguration.getAutoCommit());
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaConfiguration.getAutoCommitInterval());
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaConfiguration.getSessionTimeout());
        return propsMap;
    }
}

이것을 실행하면 다음과 같은 오류가 발생합니다.

Description:

Parameter 1 of method kafkaListenerContainerFactory in org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration required a bean of type 'org.springframework.kafka.core.ConsumerFactory' that could not be found.
    - Bean method 'kafkaConsumerFactory' in 'KafkaAutoConfiguration' not loaded because @ConditionalOnMissingBean (types: org.springframework.kafka.core.ConsumerFactory; SearchStrategy: all) found beans 'jsonConsumerFactory', 'fileConsumerFactory'


Action:

Consider revisiting the conditions above or defining a bean of type 'org.springframework.kafka.core.ConsumerFactory' in your configuration.

내가 뭘 잘못하고 있죠?

해결법

  1. ==============================

    1.Spring Boot의 Kafka Auto Configuration에 의존하지 않는 것처럼 보입니다.

    Spring Boot의 Kafka Auto Configuration에 의존하지 않는 것처럼 보입니다.

    Spring Boot는 KafkaAutoConfiguration에서 제공합니다 :

    @Bean
    @ConditionalOnMissingBean(ConsumerFactory.class)
    public ConsumerFactory<?, ?> kafkaConsumerFactory() {
    

    jsonConsumerFactory와 fileConsumerFactory가 있으므로 auto-config에서 제공 한 것보다 우선합니다.

    그러나 KafkaAnnotationDrive Configuration에서는 어떤 공장도 적용 할 수 없습니다.

    @Bean
    @ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ConsumerFactory<Object, Object> kafkaConsumerFactory) {
    

    ConsumerFactory 빈은 ConsumerFactory 유형이 아니기 때문에

    그래서:

  2. ==============================

    2.다음과 같이 KafkaListener 정의에서 각 소비자 팩토리를 정의 할 수 있습니다.

    다음과 같이 KafkaListener 정의에서 각 소비자 팩토리를 정의 할 수 있습니다.

    @KafkaListener(topics = "fileTopic", containerFactory = "fileConsumerFactory")
    public void fileConsumer(...) {...}
    
    @KafkaListener(topics = "jsonTopic", containerFactory = "jsonConsumerFactory")
    public void jsonConsumer(...) {...}
    
  3. ==============================

    3.나는 코드와 그것의 저를 위해 잘 아래에 그것을 달성했다.

    나는 코드와 그것의 저를 위해 잘 아래에 그것을 달성했다.

    // LISTENER 1
    @Bean
    @ConditionalOnMissingBean(name = "yourListenerFactory1")
    public ConsumerFactory<String, YourCustomObject1> yourConsumerFactory1() {
       Map<String, Object> props = new HashMap<>();
       props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
       props.put(ConsumerConfig.GROUP_ID_CONFIG, "YOUR-GROUP-1");
       return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
         new JsonDeserializer<>(YourCustomObject1.class));
    }
    
    
    @Bean(name = "yourListenerFactory1")
    public ConcurrentKafkaListenerContainerFactory<String, YourCustomObject1> 
      yourListenerFactory1() {
       ConcurrentKafkaListenerContainerFactory<String, YourCustomObject1> factory =
           new ConcurrentKafkaListenerContainerFactory<>();
       factory.setConsumerFactory(yourConsumerFactory1());
       ContainerProperties containerProperties = factory.getContainerProperties();
       containerProperties.setPollTimeout(...);
       containerProperties.setAckMode(AckMode...);
       return factory;
    }
    
    
    // LISTENER 2
    @Bean
    @ConditionalOnMissingBean(name = "yourListenerFactory2")
    public ConsumerFactory<String, YourCustomObject2> yourConsumerFactory2() {
       Map<String, Object> props = new HashMap<>();
       props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
       props.put(ConsumerConfig.GROUP_ID_CONFIG, "YOUR-GROUP-2");
       return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
          new JsonDeserializer<>(YourCustomObject2.class));
    }
    
    
    @Bean(name = "yourListenerFactory2")
    public ConcurrentKafkaListenerContainerFactory<String, YourCustomObject2> 
       yourListenerFactory2() {
        ConcurrentKafkaListenerContainerFactory<String, YourCustomObject2> factory 
             =  new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(yourConsumerFactory2());
        ContainerProperties containerProperties = factory.getContainerProperties();
        containerProperties.setPollTimeout(...);
        containerProperties.setAckMode(AckMode...);
        return factory;
     }
    

    또한, spring.autoconfigure.exclude 속성을 ITS로 설정해야합니다. spring.autoconfigure.exclude = org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration

    이것은 나의 소비자 설정이다.

    소비자 1

    @KafkaListener(id = "your-cousumer-1",
      topicPattern = "your-topic-1",
      containerFactory = "yourListenerFactory1")
     public void consumer1(YourCustomObject1 data,
                           Acknowledgment acknowledgment,
          @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
          @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
          @Header(KafkaHeaders.OFFSET) List<Long> offsets) throws Exception { ... }
    

    소비자 2

      @KafkaListener(id = "your-cousumer-2",
                     topicPattern = "your-topic-2",
                     containerFactory = "yourListenerFactory2")
      public void consumer2(YourCustomObject2 data,
                            Acknowledgment acknowledgment,
          @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
          @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
          @Header(KafkaHeaders.OFFSET) List<Long> offsets) throws Exception { ...  }
    

    또한, 내 카프카 템플릿은

    @Autowired
    KafkaTemplate<String, Object> kafkaTemplate;
    
  4. from https://stackoverflow.com/questions/43142295/problems-adding-multiple-kafkalistenercontainerfactories by cc-by-sa and MIT license