[SPRING] 여러 KafkaListenerContainerFactory 추가 문제
SPRING여러 KafkaListenerContainerFactory 추가 문제
안녕하세요. 저는 현재 Spring Kafka에서 협연 중이며 단일 KafkaListenerContainerFactory를 리스너에 추가하는 데 성공했습니다. 이제 여러 개의 KafkaListenerContainerFactorys를 추가하고 싶습니다. (하나는 json에 메시지가 있고 다른 하나는 문자열에 대한 것입니다.) 아래 코드를 참조하십시오.
@EnableKafka
@Configuration
public class KafkaConsumersConfig {
private final KafkaConfiguration kafkaConfiguration;
@Autowired
public KafkaConsumersConfig(KafkaConfiguration kafkaConfiguration) {
this.kafkaConfiguration = kafkaConfiguration;
}
@Bean
public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String,Record> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(jsonConsumerFactory());
factory.setConcurrency(3);
factory.setAutoStartup(true);
return factory;
}
@Bean
public ConsumerFactory<String,Record> jsonConsumerFactory(){
JsonDeserializer<Record> jsonDeserializer = new JsonDeserializer<>(Record.class);
return new DefaultKafkaConsumerFactory<>(jsonConsumerConfigs(),new StringDeserializer(), jsonDeserializer);
}
@Bean
public Map<String,Object> jsonConsumerConfigs(){
Map<String,Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfiguration.getBrokerAddress());
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfiguration.getJsonGroupId());
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaConfiguration.getAutoCommit());
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaConfiguration.getAutoCommitInterval());
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaConfiguration.getSessionTimeout());
return propsMap;
}
@Bean
public KafkaListenerContainerFactory<?> kafkaFileListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String,String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(fileConsumerFactory());
factory.setConcurrency(3);
factory.setAutoStartup(true);
return factory;
}
@Bean
public ConsumerFactory<String,String> fileConsumerFactory(){
return new DefaultKafkaConsumerFactory<>(fileConsumerConfigs());
}
@Bean
public Map<String,Object> fileConsumerConfigs(){
Map<String,Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfiguration.getBrokerAddress());
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfiguration.getFileGroupId());
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaConfiguration.getAutoCommit());
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaConfiguration.getAutoCommitInterval());
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaConfiguration.getSessionTimeout());
return propsMap;
}
}
이것을 실행하면 다음과 같은 오류가 발생합니다.
Description:
Parameter 1 of method kafkaListenerContainerFactory in org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration required a bean of type 'org.springframework.kafka.core.ConsumerFactory' that could not be found.
- Bean method 'kafkaConsumerFactory' in 'KafkaAutoConfiguration' not loaded because @ConditionalOnMissingBean (types: org.springframework.kafka.core.ConsumerFactory; SearchStrategy: all) found beans 'jsonConsumerFactory', 'fileConsumerFactory'
Action:
Consider revisiting the conditions above or defining a bean of type 'org.springframework.kafka.core.ConsumerFactory' in your configuration.
내가 뭘 잘못하고 있죠?
해결법
-
==============================
1.Spring Boot의 Kafka Auto Configuration에 의존하지 않는 것처럼 보입니다.
Spring Boot의 Kafka Auto Configuration에 의존하지 않는 것처럼 보입니다.
Spring Boot는 KafkaAutoConfiguration에서 제공합니다 :
@Bean @ConditionalOnMissingBean(ConsumerFactory.class) public ConsumerFactory<?, ?> kafkaConsumerFactory() {
jsonConsumerFactory와 fileConsumerFactory가 있으므로 auto-config에서 제공 한 것보다 우선합니다.
그러나 KafkaAnnotationDrive Configuration에서는 어떤 공장도 적용 할 수 없습니다.
@Bean @ConditionalOnMissingBean(name = "kafkaListenerContainerFactory") public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory( ConcurrentKafkaListenerContainerFactoryConfigurer configurer, ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConsumerFactory 빈은 ConsumerFactory
그래서:
-
==============================
2.다음과 같이 KafkaListener 정의에서 각 소비자 팩토리를 정의 할 수 있습니다.
다음과 같이 KafkaListener 정의에서 각 소비자 팩토리를 정의 할 수 있습니다.
@KafkaListener(topics = "fileTopic", containerFactory = "fileConsumerFactory") public void fileConsumer(...) {...} @KafkaListener(topics = "jsonTopic", containerFactory = "jsonConsumerFactory") public void jsonConsumer(...) {...}
-
==============================
3.나는 코드와 그것의 저를 위해 잘 아래에 그것을 달성했다.
나는 코드와 그것의 저를 위해 잘 아래에 그것을 달성했다.
// LISTENER 1 @Bean @ConditionalOnMissingBean(name = "yourListenerFactory1") public ConsumerFactory<String, YourCustomObject1> yourConsumerFactory1() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "YOUR-GROUP-1"); return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(YourCustomObject1.class)); } @Bean(name = "yourListenerFactory1") public ConcurrentKafkaListenerContainerFactory<String, YourCustomObject1> yourListenerFactory1() { ConcurrentKafkaListenerContainerFactory<String, YourCustomObject1> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(yourConsumerFactory1()); ContainerProperties containerProperties = factory.getContainerProperties(); containerProperties.setPollTimeout(...); containerProperties.setAckMode(AckMode...); return factory; } // LISTENER 2 @Bean @ConditionalOnMissingBean(name = "yourListenerFactory2") public ConsumerFactory<String, YourCustomObject2> yourConsumerFactory2() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "YOUR-GROUP-2"); return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(YourCustomObject2.class)); } @Bean(name = "yourListenerFactory2") public ConcurrentKafkaListenerContainerFactory<String, YourCustomObject2> yourListenerFactory2() { ConcurrentKafkaListenerContainerFactory<String, YourCustomObject2> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(yourConsumerFactory2()); ContainerProperties containerProperties = factory.getContainerProperties(); containerProperties.setPollTimeout(...); containerProperties.setAckMode(AckMode...); return factory; }
또한, spring.autoconfigure.exclude 속성을 ITS로 설정해야합니다. spring.autoconfigure.exclude = org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration
이것은 나의 소비자 설정이다.
소비자 1
@KafkaListener(id = "your-cousumer-1", topicPattern = "your-topic-1", containerFactory = "yourListenerFactory1") public void consumer1(YourCustomObject1 data, Acknowledgment acknowledgment, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions, @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics, @Header(KafkaHeaders.OFFSET) List<Long> offsets) throws Exception { ... }
소비자 2
@KafkaListener(id = "your-cousumer-2", topicPattern = "your-topic-2", containerFactory = "yourListenerFactory2") public void consumer2(YourCustomObject2 data, Acknowledgment acknowledgment, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions, @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics, @Header(KafkaHeaders.OFFSET) List<Long> offsets) throws Exception { ... }
또한, 내 카프카 템플릿은
@Autowired KafkaTemplate<String, Object> kafkaTemplate;
from https://stackoverflow.com/questions/43142295/problems-adding-multiple-kafkalistenercontainerfactories by cc-by-sa and MIT license
'SPRING' 카테고리의 다른 글
[SPRING] 스프링 데이터 저장소와 mongodb로 스프링 애플리케이션 설정하기 (0) | 2019.04.09 |
---|---|
[SPRING] Thymeleaf를 사용하여 Spring의 웹 기반 Scope에서 HTML 파일을 처리하고 처리 된 템플릿을 String으로 저장 (0) | 2019.04.09 |
[SPRING] 최대 절전 모드 : 외래 키의 열 수가 잘못되었습니다. (0) | 2019.04.09 |
[SPRING] 스프링 @ 트랜잭션 주석이 자동 배선으로 작동하지 않습니까? (0) | 2019.04.09 |
[SPRING] 무효 신임장에 대한 기본 인증이 / error로 리디렉션되는 스프링 보안 (0) | 2019.04.09 |