복붙노트

[SPRING] 임베디드 카프카 단위 테스트에서받은 메시지를 확인하는 방법

SPRING

임베디드 카프카 단위 테스트에서받은 메시지를 확인하는 방법

카프카 (Kafka) 주제에 메시지를 보내는 스프링 부팅 응용 프로그램을 만들었습니다. 나는 spring spring-integration-kafka를 사용하고있다 : KafkaProducerMessageHandler 은 채널 (SubscribableChannel)에 가입되어 수신 된 모든 메시지를 하나의 항목으로 푸시합니다. 응용 프로그램이 잘 작동합니다. 나는 콘솔 소비자 (지역 카프카)를 통해 카프카에 도착하는 메시지를 본다.

나는 또한 KafkaEmbedded를 사용하는 Integrationtest를 만듭니다. 테스트 중에 채널에 가입하여 예상 메시지를 확인하고 있습니다. 모두 정상입니다.

그러나 나는 테스트가 카프카에 넣어 진 메시지를 확인하기를 원합니다. 슬프게도 Kafka의 JavaDoc이 최고가 아닙니다. 내가 지금까지 시도한 것은 :

@ClassRule
public static KafkaEmbedded kafkaEmbedded = new KafkaEmbedded(1, true, "myTopic");
//...
@Before
public void init() throws Exception {

    mockConsumer = new MockConsumer<>( OffsetResetStrategy.EARLIEST );
    kafkaEmbedded.consumeFromAnEmbeddedTopic( mockConsumer,"sikom" );

}
//...

@Test
public void endToEnd() throws Exception {
//  ...

    ConsumerRecords<String, String> records = mockConsumer.poll( 10000 );

    StreamSupport.stream(records.spliterator(), false).forEach( record -> log.debug( "record: " + record.value() ) );


}

문제는 내가 어떤 기록도 보지 못한다는 것이다. KafkaEmbedded 설정이 올바른지 확실하지 않습니다. 그러나 메시지는 채널에 의해 수신됩니다.

해결법

  1. ==============================

    1.이것은 나를 위해 작동합니다. 시도 해봐

    이것은 나를 위해 작동합니다. 시도 해봐

    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class KafkaEmbeddedTest {
    
        private static String SENDER_TOPIC = "testTopic";
    
        @ClassRule
        // By default it creates two partitions.
        public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, SENDER_TOPIC); 
    
        @Test
        public void testSend() throws InterruptedException, ExecutionException {
    
            Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
            //If you wish to send it to partitions other than 0 and 1, 
            //then you need to specify number of paritions in the declaration
    
            KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);
            producer.send(new ProducerRecord<>(SENDER_TOPIC, 0, 0, "message00")).get();
            producer.send(new ProducerRecord<>(SENDER_TOPIC, 0, 1, "message01")).get();
            producer.send(new ProducerRecord<>(SENDER_TOPIC, 1, 0, "message10")).get();
    
    
            Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("sampleRawConsumer", "false", embeddedKafka);
            // Make sure you set the offset as earliest, because by the 
            // time consumer starts, producer might have sent all messages
            consumerProps.put("auto.offset.reset", "earliest");
    
            final List<String> receivedMessages = Lists.newArrayList();
            final CountDownLatch latch = new CountDownLatch(3);
            ExecutorService executorService = Executors.newSingleThreadExecutor();
            executorService.execute(() -> {
                KafkaConsumer<Integer, String> kafkaConsumer = new KafkaConsumer<>(consumerProps);
                kafkaConsumer.subscribe(Collections.singletonList(SENDER_TOPIC));
                try {
                    while (true) {
                        ConsumerRecords<Integer, String> records = kafkaConsumer.poll(100);
                        records.iterator().forEachRemaining(record -> {
                            receivedMessages.add(record.value());
                            latch.countDown();
                        });
                    }
                } finally {
                    kafkaConsumer.close();
                }
            });
    
        latch.await(10, TimeUnit.SECONDS);
        assertTrue(receivedMessages.containsAll(Arrays.asList("message00", "message01", "message10")));
        }
    }
    

    Producer.Send (..)는 비동기 작업이므로 카운트 다운 래치를 사용하고 있습니다. 그래서 내가 여기서하고있는 일은 카프카를 100 밀리 초마다 폴링하는 무한 루프에서 기다리는 것입니다. 만약 새로운 레코드가 있다면, 미래의 어설 션을 위해 목록에 추가 한 다음 카운트 다운을 줄입니다. 그리고 나는 단지 총 10 초를 기다릴 것입니다. 간단한 루프를 사용하고 몇 분 후에 종료 할 수 있습니다 (CountdownLatch 및 ExecutorService 항목을 사용하지 않으려는 경우)

  2. from https://stackoverflow.com/questions/48682745/embeddedkafka-how-to-check-received-messages-in-unit-test by cc-by-sa and MIT license