[SCALA] 스트리밍 스파크 - 읽기 및 쓰기 카프카의 주제에
SCALA스트리밍 스파크 - 읽기 및 쓰기 카프카의 주제에
나는 두 카프카 큐 사이에 프로세스 데이터를 스트리밍 불꽃을 사용하고 있지만 스파크에서 카프카에 쓰기에 좋은 방법을 찾을 수없는 것. 나는이 시도했다 :
input.foreachRDD(rdd =>
rdd.foreachPartition(partition =>
partition.foreach {
case x: String => {
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
println(x)
val producer = new KafkaProducer[String, String](props)
val message = new ProducerRecord[String, String]("output", null, x)
producer.send(message)
}
}
)
)
것이 아니라 모든 메시지에 대한 새로운 KafkaProducer을 인스턴스화하는 것은 실제 상황에서 명확하게 실행할 수 없게하고 나는 그것을 해결하려면 노력하고있어 같이 그것을 작동합니다.
나는 모든 프로세스에 대해 하나의 인스턴스에 대한 참조를 유지하기 위해 좋아하고 내가 메시지를 보낼 필요로 할 때 액세스 것입니다. 나는 스파크 스트리밍에서 카프카에 어떻게 쓸 수 있는가?
해결법
-
==============================
1.나의 첫번째 조언은 그 사용자의 요구가 충분히 빠른 경우 foreachPartition에 새로운 인스턴스를 생성하고 측정하려고하는 것 (foreachPartition에 무거운 물체를 인스턴스화하는 공식 문서에서 알 것입니다).
나의 첫번째 조언은 그 사용자의 요구가 충분히 빠른 경우 foreachPartition에 새로운 인스턴스를 생성하고 측정하려고하는 것 (foreachPartition에 무거운 물체를 인스턴스화하는 공식 문서에서 알 것입니다).
이 예와 같이 또 다른 옵션은 오브젝트 풀을 사용하는 것입니다 :
https://github.com/miguno/kafka-storm-starter/blob/develop/src/main/scala/com/miguno/kafkastorm/kafka/PooledKafkaProducerAppFactory.scala
나는 그러나 검사 점을 사용하는 경우는 하드 구현 발견했다.
나를 위해 잘 작동하면 다음 블로그 게시물에 설명 된대로 공장 또 다른 버전, 당신은 그냥 (코멘트 섹션을 확인) 귀하의 요구에 대한 충분한 병렬 처리를 제공하는지 확인해야합니다 :
http://allegro.tech/2015/08/spark-kafka-integration.html
-
==============================
2.네, 불행하게도 (1.x에서, 2.x에서) 효율적으로 카프카를 작성하는 방법을 직선 앞으로가되지 않습니다 스파크.
네, 불행하게도 (1.x에서, 2.x에서) 효율적으로 카프카를 작성하는 방법을 직선 앞으로가되지 않습니다 스파크.
나는 다음과 같은 방법을 건의 할 것입니다 :
다음은이 방법에 대한 높은 수준의 설정입니다 :
스파크 2.0로 불꽃 스트리밍과 작업 아래의 코드 조각.
1 단계 : 포장 KafkaProducer
import java.util.concurrent.Future import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata} class MySparkKafkaProducer[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable { /* This is the key idea that allows us to work around running into NotSerializableExceptions. */ lazy val producer = createProducer() def send(topic: String, key: K, value: V): Future[RecordMetadata] = producer.send(new ProducerRecord[K, V](topic, key, value)) def send(topic: String, value: V): Future[RecordMetadata] = producer.send(new ProducerRecord[K, V](topic, value)) } object MySparkKafkaProducer { import scala.collection.JavaConversions._ def apply[K, V](config: Map[String, Object]): MySparkKafkaProducer[K, V] = { val createProducerFunc = () => { val producer = new KafkaProducer[K, V](config) sys.addShutdownHook { // Ensure that, on executor JVM shutdown, the Kafka producer sends // any buffered messages to Kafka before shutting down. producer.close() } producer } new MySparkKafkaProducer(createProducerFunc) } def apply[K, V](config: java.util.Properties): MySparkKafkaProducer[K, V] = apply(config.toMap) }
2 단계 : 사용 각 집행 인에게 자신의 포장 KafkaProducer 인스턴스를 제공하는 방송 변수
import org.apache.kafka.clients.producer.ProducerConfig val ssc: StreamingContext = { val sparkConf = new SparkConf().setAppName("spark-streaming-kafka-example").setMaster("local[2]") new StreamingContext(sparkConf, Seconds(1)) } ssc.checkpoint("checkpoint-directory") val kafkaProducer: Broadcast[MySparkKafkaProducer[Array[Byte], String]] = { val kafkaProducerConfig = { val p = new Properties() p.setProperty("bootstrap.servers", "broker1:9092") p.setProperty("key.serializer", classOf[ByteArraySerializer].getName) p.setProperty("value.serializer", classOf[StringSerializer].getName) p } ssc.sparkContext.broadcast(MySparkKafkaProducer[Array[Byte], String](kafkaProducerConfig)) }
3 단계 : 카프카 스트리밍 스파크 쓰기 재하여 (각 실행 프로그램의 경우) 같은 래핑 KafkaProducer 인스턴스
import java.util.concurrent.Future import org.apache.kafka.clients.producer.RecordMetadata val stream: DStream[String] = ??? stream.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => val metadata: Stream[Future[RecordMetadata]] = partitionOfRecords.map { record => kafkaProducer.value.send("my-output-topic", record) }.toStream metadata.foreach { metadata => metadata.get() } } }
도움이 되었기를 바랍니다.
-
==============================
3.클라우 데라 의해 유지 스트리밍 카프카 기록기있다 (실제로 점화 JIRA로부터 분사 [1]). 그것은 기본적 요소 (잘하면 대형) 콜렉션 객체에 '무거운'을 만드는 데 소요되는 시간을 상각 파티션 당 프로듀서를 만듭니다.
클라우 데라 의해 유지 스트리밍 카프카 기록기있다 (실제로 점화 JIRA로부터 분사 [1]). 그것은 기본적 요소 (잘하면 대형) 콜렉션 객체에 '무거운'을 만드는 데 소요되는 시간을 상각 파티션 당 프로듀서를 만듭니다.
작가는 여기에서 찾을 수 있습니다 : https://github.com/cloudera/spark-kafka-writer
-
==============================
4.
// Subscribe to a topic and read messages from the earliest to latest offsets val ds= spark .readStream // use `read` for batch, like DataFrame .format("kafka") .option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2") .option("subscribe", "source-topic1") .option("startingOffsets", "earliest") .option("endingOffsets", "latest") .load()
키와 값을 읽고 간단하게하기 위해 우리는 문자열 유형으로 둘 다 변환하고 있습니다, 모두에 대해 스키마를 적용합니다.
val dsStruc = ds.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)]
dsStruc 스키마를 가지고 있기 때문에, 필터, AGG 같은 모든 SQL 종류의 작업을 허용, 그것은에 .. 등을 선택합니다.
dsStruc .writeStream // use `write` for batch, like DataFrame .format("kafka") .option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2") .option("topic", "target-topic1") .start()
읽기 나 쓰기에 카프카 통합을위한 더 많은 구성
"org.apache.spark" % "spark-core_2.11" % 2.2.0, "org.apache.spark" % "spark-streaming_2.11" % 2.2.0, "org.apache.spark" % "spark-sql-kafka-0-10_2.11" % 2.2.0,
-
==============================
5.저도 같은 문제가 발생하고이 게시물을 발견했다.
저도 같은 문제가 발생하고이 게시물을 발견했다.
저자는 집행 당 1 명 프로듀서를 작성하여 문제를 해결한다. 대신에 생산 자체를 전송, 그는 그것을 방송에 의해 집행 인의 프로듀서를 만드는 방법 만 "레시피"를 전송합니다.
val kafkaSink = sparkContext.broadcast(KafkaSink(conf))
그는 유유히 생산자를 생성하는 래퍼를 사용합니다 :
class KafkaSink(createProducer: () => KafkaProducer[String, String]) extends Serializable { lazy val producer = createProducer() def send(topic: String, value: String): Unit = producer.send(new ProducerRecord(topic, value)) } object KafkaSink { def apply(config: Map[String, Object]): KafkaSink = { val f = () => { val producer = new KafkaProducer[String, String](config) sys.addShutdownHook { producer.close() } producer } new KafkaSink(f) } }
카프카 생산 단지 집행자에 처음 사용하기 전에 초기화되기 때문에 래퍼는 직렬화 가능합니다. 드라이버는 래퍼에 대한 참조를 유지하고 래퍼는 각 집행자의 프로듀서를 사용하여 메시지를 보냅니다 :
dstream.foreachRDD { rdd => rdd.foreach { message => kafkaSink.value.send("topicName", message) } }
-
==============================
6.왜 불가능합니까? 각 파티션의 작업의 시작에 연결 (및 동기화)를 다시 실행해야하므로, 기본적으로 각 RDD의 각 파티션은 독립적으로 실행하는 것입니다 (잘 다른 클러스터 노드에서 실행할 수 있습니다). 그 오버 헤드가 너무 높은 경우는 허용 될 때까지 당신은 당신 StreamingContext의 배치 크기를 증가한다 (OBV을.이 일에 지연 비용이있다).
왜 불가능합니까? 각 파티션의 작업의 시작에 연결 (및 동기화)를 다시 실행해야하므로, 기본적으로 각 RDD의 각 파티션은 독립적으로 실행하는 것입니다 (잘 다른 클러스터 노드에서 실행할 수 있습니다). 그 오버 헤드가 너무 높은 경우는 허용 될 때까지 당신은 당신 StreamingContext의 배치 크기를 증가한다 (OBV을.이 일에 지연 비용이있다).
(각 파티션에 수천 개의 메시지를 처리하지 않는 경우, 당신은 더 나은 독립 실행 형 응용 프로그램과 함께 무엇입니까? 당신은 당신이 불꽃 스트리밍 필요가 전혀겠습니까입니까?)
-
==============================
7.이것은 당신이 무엇을 원하는 수 있습니다. 당신은 기본적으로 기록의 각 파티션에 대해 하나 개의 생산을 만들 수 있습니다.
이것은 당신이 무엇을 원하는 수 있습니다. 당신은 기본적으로 기록의 각 파티션에 대해 하나 개의 생산을 만들 수 있습니다.
input.foreachRDD(rdd => rdd.foreachPartition( partitionOfRecords => { val props = new HashMap[String, Object]() props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers) props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") val producer = new KafkaProducer[String,String](props) partitionOfRecords.foreach { case x:String=>{ println(x) val message=new ProducerRecord[String, String]("output",null,x) producer.send(message) } } }) )
희망이 도움이
from https://stackoverflow.com/questions/31590592/spark-streaming-read-and-write-on-kafka-topic by cc-by-sa and MIT license
'SCALA' 카테고리의 다른 글
[SCALA] Scala.React의 상태는 무엇입니까? [닫은] (0) | 2019.11.18 |
---|---|
[SCALA] 명확한 설명은 스칼라에서 선물과 약속에 대한 필요 (0) | 2019.11.18 |
[SCALA] 인 IntelliJ와 스칼라 설정 (0) | 2019.11.18 |
[SCALA] 나는 구성하고 스칼라 / 리프트에서 JSON 문자열을 구문 분석 할 수있는 방법 (0) | 2019.11.18 |
[SCALA] 어떻게 스칼라에서 패턴 매칭은 바이트 코드 수준에서 구현됩니다? (0) | 2019.11.18 |