복붙노트

[SCALA] 스트리밍 스파크 - 읽기 및 쓰기 카프카의 주제에

SCALA

스트리밍 스파크 - 읽기 및 쓰기 카프카의 주제에

나는 두 카프카 큐 사이에 프로세스 데이터를 스트리밍 불꽃을 사용하고 있지만 스파크에서 카프카에 쓰기에 좋은 방법을 찾을 수없는 것. 나는이 시도했다 :

input.foreachRDD(rdd =>
  rdd.foreachPartition(partition =>
    partition.foreach {
      case x: String => {
        val props = new HashMap[String, Object]()

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")

        println(x)
        val producer = new KafkaProducer[String, String](props)
        val message = new ProducerRecord[String, String]("output", null, x)
        producer.send(message)
      }
    }
  )
)

것이 아니라 모든 메시지에 대한 새로운 KafkaProducer을 인스턴스화하는 것은 실제 상황에서 명확하게 실행할 수 없게하고 나는 그것을 해결하려면 노력하고있어 같이 그것을 작동합니다.

나는 모든 프로세스에 대해 하나의 인스턴스에 대한 참조를 유지하기 위해 좋아하고 내가 메시지를 보낼 필요로 할 때 액세스 것입니다. 나는 스파크 스트리밍에서 카프카에 어떻게 쓸 수 있는가?

해결법

  1. ==============================

    1.나의 첫번째 조언은 그 사용자의 요구가 충분히 빠른 경우 foreachPartition에 새로운 인스턴스를 생성하고 측정하려고하는 것 (foreachPartition에 무거운 물체를 인스턴스화하는 공식 문서에서 알 것입니다).

    나의 첫번째 조언은 그 사용자의 요구가 충분히 빠른 경우 foreachPartition에 새로운 인스턴스를 생성하고 측정하려고하는 것 (foreachPartition에 무거운 물체를 인스턴스화하는 공식 문서에서 알 것입니다).

    이 예와 같이 또 다른 옵션은 오브젝트 풀을 사용하는 것입니다 :

    https://github.com/miguno/kafka-storm-starter/blob/develop/src/main/scala/com/miguno/kafkastorm/kafka/PooledKafkaProducerAppFactory.scala

    나는 그러나 검사 점을 사용하는 경우는 하드 구현 발견했다.

    나를 위해 잘 작동하면 다음 블로그 게시물에 설명 된대로 공장 또 다른 버전, 당신은 그냥 (코멘트 섹션을 확인) 귀하의 요구에 대한 충분한 병렬 처리를 제공하는지 확인해야합니다 :

    http://allegro.tech/2015/08/spark-kafka-integration.html

  2. ==============================

    2.네, 불행하게도 (1.x에서, 2.x에서) 효율적으로 카프카를 작성하는 방법을 직선 앞으로가되지 않습니다 스파크.

    네, 불행하게도 (1.x에서, 2.x에서) 효율적으로 카프카를 작성하는 방법을 직선 앞으로가되지 않습니다 스파크.

    나는 다음과 같은 방법을 건의 할 것입니다 :

    다음은이 방법에 대한 높은 수준의 설정입니다 :

    스파크 2.0로 불꽃 스트리밍과 작업 아래의 코드 조각.

    1 단계 : 포장 KafkaProducer

    import java.util.concurrent.Future
    
    import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}
    
    class MySparkKafkaProducer[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {
    
      /* This is the key idea that allows us to work around running into
         NotSerializableExceptions. */
      lazy val producer = createProducer()
    
      def send(topic: String, key: K, value: V): Future[RecordMetadata] =
        producer.send(new ProducerRecord[K, V](topic, key, value))
    
      def send(topic: String, value: V): Future[RecordMetadata] =
        producer.send(new ProducerRecord[K, V](topic, value))
    
    }
    
    object MySparkKafkaProducer {
    
      import scala.collection.JavaConversions._
    
      def apply[K, V](config: Map[String, Object]): MySparkKafkaProducer[K, V] = {
        val createProducerFunc = () => {
          val producer = new KafkaProducer[K, V](config)
    
          sys.addShutdownHook {
            // Ensure that, on executor JVM shutdown, the Kafka producer sends
            // any buffered messages to Kafka before shutting down.
            producer.close()
          }
    
          producer
        }
        new MySparkKafkaProducer(createProducerFunc)
      }
    
      def apply[K, V](config: java.util.Properties): MySparkKafkaProducer[K, V] = apply(config.toMap)
    
    }
    

    2 단계 : 사용 각 집행 인에게 자신의 포장 KafkaProducer 인스턴스를 제공하는 방송 변수

    import org.apache.kafka.clients.producer.ProducerConfig
    
    val ssc: StreamingContext = {
      val sparkConf = new SparkConf().setAppName("spark-streaming-kafka-example").setMaster("local[2]")
      new StreamingContext(sparkConf, Seconds(1))
    }
    
    ssc.checkpoint("checkpoint-directory")
    
    val kafkaProducer: Broadcast[MySparkKafkaProducer[Array[Byte], String]] = {
      val kafkaProducerConfig = {
        val p = new Properties()
        p.setProperty("bootstrap.servers", "broker1:9092")
        p.setProperty("key.serializer", classOf[ByteArraySerializer].getName)
        p.setProperty("value.serializer", classOf[StringSerializer].getName)
        p
      }
      ssc.sparkContext.broadcast(MySparkKafkaProducer[Array[Byte], String](kafkaProducerConfig))
    }
    

    3 단계 : 카프카 스트리밍 스파크 쓰기 재하여 (각 실행 프로그램의 경우) 같은 래핑 KafkaProducer 인스턴스

    import java.util.concurrent.Future
    import org.apache.kafka.clients.producer.RecordMetadata
    
    val stream: DStream[String] = ???
    stream.foreachRDD { rdd =>
      rdd.foreachPartition { partitionOfRecords =>
        val metadata: Stream[Future[RecordMetadata]] = partitionOfRecords.map { record =>
          kafkaProducer.value.send("my-output-topic", record)
        }.toStream
        metadata.foreach { metadata => metadata.get() }
      }
    }
    

    도움이 되었기를 바랍니다.

  3. ==============================

    3.클라우 데라 의해 유지 스트리밍 카프카 기록기있다 (실제로 점화 JIRA로부터 분사 [1]). 그것은 기본적 요소 (잘하면 대형) 콜렉션 객체에 '무거운'을 만드는 데 소요되는 시간을 상각 파티션 당 프로듀서를 만듭니다.

    클라우 데라 의해 유지 스트리밍 카프카 기록기있다 (실제로 점화 JIRA로부터 분사 [1]). 그것은 기본적 요소 (잘하면 대형) 콜렉션 객체에 '무거운'을 만드는 데 소요되는 시간을 상각 파티션 당 프로듀서를 만듭니다.

    작가는 여기에서 찾을 수 있습니다 : https://github.com/cloudera/spark-kafka-writer

  4. ==============================

    4.

    // Subscribe to a topic and read messages from the earliest to latest offsets
    val ds= spark
      .readStream // use `read` for batch, like DataFrame
      .format("kafka")
      .option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2")
      .option("subscribe", "source-topic1")
      .option("startingOffsets", "earliest")
      .option("endingOffsets", "latest")
      .load()
    

    키와 값을 읽고 간단하게하기 위해 우리는 문자열 유형으로 둘 다 변환하고 있습니다, 모두에 대해 스키마를 적용합니다.

    val dsStruc = ds.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
      .as[(String, String)]
    

    dsStruc 스키마를 가지고 있기 때문에, 필터, AGG 같은 모든 SQL 종류의 작업을 허용, 그것은에 .. 등을 선택합니다.

    dsStruc
      .writeStream // use `write` for batch, like DataFrame
      .format("kafka")
      .option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2")
      .option("topic", "target-topic1")
      .start()
    

    읽기 나 쓰기에 카프카 통합을위한 더 많은 구성

     "org.apache.spark" % "spark-core_2.11" % 2.2.0,
     "org.apache.spark" % "spark-streaming_2.11" % 2.2.0,
     "org.apache.spark" % "spark-sql-kafka-0-10_2.11" % 2.2.0,
    
  5. ==============================

    5.저도 같은 문제가 발생하고이 게시물을 발견했다.

    저도 같은 문제가 발생하고이 게시물을 발견했다.

    저자는 집행 당 1 명 프로듀서를 작성하여 문제를 해결한다. 대신에 생산 자체를 전송, 그는 그것을 방송에 의해 집행 인의 프로듀서를 만드는 방법 만 "레시피"를 전송합니다.

        val kafkaSink = sparkContext.broadcast(KafkaSink(conf))
    

    그는 유유히 생산자를 생성하는 래퍼를 사용합니다 :

        class KafkaSink(createProducer: () => KafkaProducer[String, String]) extends Serializable {
    
          lazy val producer = createProducer()
    
          def send(topic: String, value: String): Unit = producer.send(new     ProducerRecord(topic, value))
        }
    
    
        object KafkaSink {
          def apply(config: Map[String, Object]): KafkaSink = {
            val f = () => {
              val producer = new KafkaProducer[String, String](config)
    
              sys.addShutdownHook {
                producer.close()
              }
    
              producer
            }
            new KafkaSink(f)
          }
        }
    

    카프카 생산 단지 집행자에 처음 사용하기 전에 초기화되기 때문에 래퍼는 직렬화 가능합니다. 드라이버는 래퍼에 대한 참조를 유지하고 래퍼는 각 집행자의 프로듀서를 사용하여 메시지를 보냅니다 :

        dstream.foreachRDD { rdd =>
          rdd.foreach { message =>
            kafkaSink.value.send("topicName", message)
          }
        }
    
  6. ==============================

    6.왜 불가능합니까? 각 파티션의 작업의 시작에 연결 (및 동기화)를 다시 실행해야하므로, 기본적으로 각 RDD의 각 파티션은 독립적으로 실행하는 것입니다 (잘 다른 클러스터 노드에서 실행할 수 있습니다). 그 오버 헤드가 너무 높은 경우는 허용 될 때까지 당신은 당신 StreamingContext의 배치 크기를 증가한다 (OBV을.이 일에 지연 비용이있다).

    왜 불가능합니까? 각 파티션의 작업의 시작에 연결 (및 동기화)를 다시 실행해야하므로, 기본적으로 각 RDD의 각 파티션은 독립적으로 실행하는 것입니다 (잘 다른 클러스터 노드에서 실행할 수 있습니다). 그 오버 헤드가 너무 높은 경우는 허용 될 때까지 당신은 당신 StreamingContext의 배치 크기를 증가한다 (OBV을.이 일에 지연 비용이있다).

    (각 파티션에 수천 개의 메시지를 처리하지 않는 경우, 당신은 더 나은 독립 실행 형 응용 프로그램과 함께 무엇입니까? 당신은 당신이 불꽃 스트리밍 필요가 전혀겠습니까입니까?)

  7. ==============================

    7.이것은 당신이 무엇을 원하는 수 있습니다. 당신은 기본적으로 기록의 각 파티션에 대해 하나 개의 생산을 만들 수 있습니다.

    이것은 당신이 무엇을 원하는 수 있습니다. 당신은 기본적으로 기록의 각 파티션에 대해 하나 개의 생산을 만들 수 있습니다.

    input.foreachRDD(rdd =>
          rdd.foreachPartition(
              partitionOfRecords =>
                {
                    val props = new HashMap[String, Object]()
                    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
                    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                      "org.apache.kafka.common.serialization.StringSerializer")
                    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                      "org.apache.kafka.common.serialization.StringSerializer")
                    val producer = new KafkaProducer[String,String](props)
    
                    partitionOfRecords.foreach
                    {
                        case x:String=>{
                            println(x)
    
                            val message=new ProducerRecord[String, String]("output",null,x)
                            producer.send(message)
                        }
                    }
              })
    ) 
    

    희망이 도움이

  8. from https://stackoverflow.com/questions/31590592/spark-streaming-read-and-write-on-kafka-topic by cc-by-sa and MIT license