[SCALA] 카프카 소비자는 이벤트를 반환하지
SCALA카프카 소비자는 이벤트를 반환하지
스칼라 아래 카프카의 소비자는 폴 호출에서 이벤트를 반환하지 않습니다.
그러나 화제가 정확한지, 나는 이벤트가 콘솔 소비자를 사용하여 주제에 전송되는 것을 볼 수 있습니다 :
/opt/kafka_2.11-0.10.1.0/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic my_topic --from-beginning
나는 또한) (내가 디버거 및 kafkaConsumer.listTopics 된 invoke 그것을 통해 단계 때 아래에있는 내 스칼라 코드 샘플에있는 항목을 참조하십시오
또한, 이것은 그래서 나는 단지이 특성과 소비자 (즉, 다른 소비자 인스턴스가 메시지를 소비 할 수 없음)의 인스턴스를 만드는거야, 하나의 단위 테스트에서 호출됩니다. 또한 임의의 GROUP_ID을 사용하고 있습니다.
코드 / 구성 아래에 거기에 아무것도 잘못인가?
import java.util.Properties
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer}
import scala.util.Random
trait KafkaTest {
val kafkaConsumerProperties = new Properties()
kafkaConsumerProperties.put("bootstrap.servers", "kafka:9092")
kafkaConsumerProperties.put("group.id", Random.alphanumeric.take(10).mkString)
kafkaConsumerProperties.put("key.deserializer", classOf[ByteArrayDeserializer])
kafkaConsumerProperties.put("value.deserializer", classOf[StringDeserializer])
val kafkaConsumer = new KafkaConsumer[String, String](kafkaConsumerProperties)
kafkaConsumer.subscribe(java.util.Collections.singletonList("my_topic"))
def checkKafkaHasReceivedEvent(): Assertion = {
val kafkaEvents = kafkaConsumer.poll(2000) // Always returns 0 events?
...
}
}
폴 시간 제한을 늘리면 도움이되지 않습니다.
해결법
-
==============================
1.AUTO_OFFSET_RESET_CONFIG 속성을 처음부터 읽으려면 기본적으로 초기에 설정해야합니다 그것을 "최신"
AUTO_OFFSET_RESET_CONFIG 속성을 처음부터 읽으려면 기본적으로 초기에 설정해야합니다 그것을 "최신"
kafkaConsumerProperties.put( ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.EARLIEST.toString().toLowerCase())
from https://stackoverflow.com/questions/53867775/kafka-consumer-not-returning-any-events by cc-by-sa and MIT license
'SCALA' 카테고리의 다른 글
[SCALA] 좋은 scalaz 소개 [마감] (0) | 2019.11.24 |
---|---|
[SCALA] 데이터베이스를 사용하여 JDBC에 스파크 데이터 집합을 쓸 수 없음 (0) | 2019.11.24 |
[SCALA] 아파치 스파크 :하지 않을 때는 mapPartition 및 foreachPartition를 사용하는? (0) | 2019.11.24 |
[SCALA] JSON 파일이 아닌 폴더로 RDD 데이터를 저장하는 방법 (0) | 2019.11.24 |
[SCALA] 어떻게 요소의 액세스 스파크 RDD 배열에 인덱스를 기반으로 (0) | 2019.11.24 |