복붙노트

[SCALA] 카프카 소비자는 이벤트를 반환하지

SCALA

카프카 소비자는 이벤트를 반환하지

스칼라 아래 카프카의 소비자는 폴 호출에서 이벤트를 반환하지 않습니다.

그러나 화제가 정확한지, 나는 이벤트가 콘솔 소비자를 사용하여 주제에 전송되는 것을 볼 수 있습니다 :

/opt/kafka_2.11-0.10.1.0/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic my_topic --from-beginning

나는 또한) (내가 디버거 및 kafkaConsumer.listTopics 된 invoke 그것을 통해 단계 때 아래에있는 내 스칼라 코드 샘플에있는 항목을 참조하십시오

또한, 이것은 그래서 나는 단지이 특성과 소비자 (즉, 다른 소비자 인스턴스가 메시지를 소비 할 수 없음)의 인스턴스를 만드는거야, 하나의 단위 테스트에서 호출됩니다. 또한 임의의 GROUP_ID을 사용하고 있습니다.

코드 / 구성 아래에 거기에 아무것도 잘못인가?

import java.util.Properties

import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer}

import scala.util.Random

trait KafkaTest {

  val kafkaConsumerProperties = new Properties()

  kafkaConsumerProperties.put("bootstrap.servers", "kafka:9092")

  kafkaConsumerProperties.put("group.id", Random.alphanumeric.take(10).mkString)

  kafkaConsumerProperties.put("key.deserializer", classOf[ByteArrayDeserializer])

  kafkaConsumerProperties.put("value.deserializer", classOf[StringDeserializer])

  val kafkaConsumer = new KafkaConsumer[String, String](kafkaConsumerProperties)

kafkaConsumer.subscribe(java.util.Collections.singletonList("my_topic"))

  def checkKafkaHasReceivedEvent(): Assertion = {

    val kafkaEvents = kafkaConsumer.poll(2000) // Always returns 0 events?
    ...
  }
}

폴 시간 제한을 늘리면 도움이되지 않습니다.

해결법

  1. ==============================

    1.AUTO_OFFSET_RESET_CONFIG 속성을 처음부터 읽으려면 기본적으로 초기에 설정해야합니다 그것을 "최신"

    AUTO_OFFSET_RESET_CONFIG 속성을 처음부터 읽으려면 기본적으로 초기에 설정해야합니다 그것을 "최신"

    kafkaConsumerProperties.put(
        ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
        OffsetResetStrategy.EARLIEST.toString().toLowerCase())
    
  2. from https://stackoverflow.com/questions/53867775/kafka-consumer-not-returning-any-events by cc-by-sa and MIT license