복붙노트

[HADOOP] 스파크 스트리밍 출력 지속

HADOOP

스파크 스트리밍 출력 지속

나는 메시징 응용 프로그램에서 데이터를 수집하고 있는데, 현재 Flume을 사용 중이며 하루에 약 5 천만 레코드를 보냅니다.

나는 카프카를 사용하고 싶다. Spark Streaming을 사용하여 Kafka에서 소비 impo를 사용하여 hadoop 및 쿼리에 지속

내가 시도한 각 접근법에 문제가있다.

접근법 1 - rdd를 마루로 저장하고 외부 하이브 마루 테이블을 쪽모 세공 디렉토리로 향하게하십시오

// scala
val ssc =  new StreamingContext(sparkConf, Seconds(bucketsize.toInt))
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
lines.foreachRDD(rdd => {

    // 1 - Create a SchemaRDD object from the rdd and specify the schema
    val SchemaRDD1 = sqlContext.jsonRDD(rdd, schema)

    // 2 - register it as a spark sql table
    SchemaRDD1.registerTempTable("sparktable")

    // 3 - qry sparktable to produce another SchemaRDD object of the data needed 'finalParquet'. and persist this as parquet files
    val finalParquet = sqlContext.sql(sql)
    finalParquet.saveAsParquetFile(dir)

문제는 finalParquet.saveAsParquetFile이 큰 번호를 출력한다는 것입니다. Kafka에서받은 Dstream은 1 분의 배치 크기로 200 개가 넘는 파일을 출력합니다. saveAsTextFile NOT 출력을 여러 파일로 분할하지 않는 다른 방법으로 계산이 분산되어 있기 때문에 많은 파일을 출력하는 이유는 무엇입니까? propsed 해결책은 나를 위해 최적의 것처럼 보이지 않는다. 한 명의 사용자가 말하기를 - 단일 출력 파일을 갖는 것은 데이터가 거의없는 경우에만 좋은 아이디어입니다.

접근법 2 - 하이브리드 컨텍스트 사용. 하이브 테이블에 직접 rdd 데이터를 삽입하십시오.

# python
sqlContext = HiveContext(sc)
ssc = StreamingContext(sc, int(batch_interval))
kvs = KafkaUtils.createStream(ssc, zkQuorum, group, {topics: 1})
lines = kvs.map(lambda x: x[1]).persist(StorageLevel.MEMORY_AND_DISK_SER)
lines.foreachRDD(sendRecord)

def sendRecord(rdd):

  sql = "INSERT INTO TABLE table select * from beacon_sparktable"

  # 1 - Apply the schema to the RDD creating a data frame 'beaconDF'
  beaconDF = sqlContext.jsonRDD(rdd,schema)

  # 2- Register the DataFrame as a spark sql table.
  beaconDF.registerTempTable("beacon_sparktable")

  # 3 - insert to hive directly from a qry on the spark sql table
  sqlContext.sql(sql);

이것은 잘 동작하지만, 그것은 쪽매 테이블에 직접 삽입하지만 처리 시간이 일괄 처리 간격 시간을 초과하면 일괄 처리 일정이 지연됩니다. 소비자는 생산되지 않는 것을 유지하고 처리 할 배치는 대기열에 들어가기 시작합니다.

하이브에 쓰기가 느린 것 같습니다. 배치 간격 크기를 조정하고 더 많은 소비자 인스턴스를 실행 해 보았습니다.

요약하자면

스파이크 스트리밍의 빅 데이터를 유지하는 가장 좋은 방법은 여러 파일에 문제가 있고 하이브에 쓰기가 잠재적으로 지연된다는 것입니다. 다른 사람들은 무엇을하고 있습니까?

비슷한 질문이 여기에 있지만, 그는 너무 많은 파일에 apposed 디렉토리 문제가 있습니다 임팔라가 그것을 읽을 수 있도록 스파크 스트리밍의 출력물을 만드는 방법?

많은 도움을 많이 주셔서 감사합니다.

해결법

  1. ==============================

    1.솔루션 # 2에서 생성 된 파일 수는 각 RDD의 파티션 수를 통해 제어 할 수 있습니다.

    솔루션 # 2에서 생성 된 파일 수는 각 RDD의 파티션 수를 통해 제어 할 수 있습니다.

    다음 예제를 참조하십시오.

    // create a Hive table (assume it's already existing)
    sqlContext.sql("CREATE TABLE test (id int, txt string) STORED AS PARQUET")
    
    // create a RDD with 2 records and only 1 partition
    val rdd = sc.parallelize(List( List(1, "hello"), List(2, "world") ), 1)
    
    // create a DataFrame from the RDD
    val schema = StructType(Seq(
     StructField("id", IntegerType, nullable = false),
     StructField("txt", StringType, nullable = false)
    ))
    val df = sqlContext.createDataFrame(rdd.map( Row(_:_*) ), schema)
    
    // this creates a single file, because the RDD has 1 partition
    df.write.mode("append").saveAsTable("test")
    

    이제 Kafka에서 데이터를 가져 오는 빈도와 각 RDD의 파티션 수 (기본값, 다시 분할하여 줄일 수있는 Kafka 항목의 파티션)를 활용 해보십시오.

    CDH 5.5.1에서 Spark 1.5를 사용하고 있는데 df.write.mode ( "append"). saveAsTable ( "test") 또는 SQL 문자열을 사용하여 동일한 결과를 얻습니다.

  2. from https://stackoverflow.com/questions/32885825/persisting-spark-streaming-output by cc-by-sa and MIT license