복붙노트

[SCALA] S3A가 매우 느립니다 동안 불꽃을 사용하여 S3에 마루 파일을 작성합니다

SCALA

S3A가 매우 느립니다 동안 불꽃을 사용하여 S3에 마루 파일을 작성합니다

나는 스파크 1.6.1을 사용하여 아마존 S3에 마루 파일을 작성하는 것을 시도하고있다. 내가 발생 해요 있다는 작은 마루는 그 많은 데이터하지 그래서 작성하면 ~ 2기가바이트입니다. 내가 사용할 수있는 플랫폼으로 불꽃을 증명하기 위해 노력하고있어.

기본적으로 내가 갈거야 것은 그때가 마루에에서 해당 테이블을 작성하는거야, dataframes와 스타 스키마를 설정하는 것입니다. 데이터는 공급 업체에서 제공 CSV 파일에서 들어오고 나는 ETL 플랫폼으로 불꽃을 사용하고 있습니다. 나는 현재 EC2에서 3 노드 클러스터 (r3.2xlarge) 그래서 1백20기가바이트 실행 프로그램과 16 개 코어의 메모리의 총이있다.

입력 파일은 22기가바이트에 대해 총 나는 지금 그 2GB의 데이터에 대해 추출하고있다. 나는 전체 데이터 집합을로드 시작할 때 결국이 테라 바이트가 될 것입니다.

여기 내 스파크 / 스칼라 의사 코드는 다음과 같습니다

  def loadStage(): Unit = {
    sc.hadoopConfiguration.set("fs.s3a.buffer.dir", "/tmp/tempData")
    sc.hadoopConfiguration.set("spark.sql.parquet.output.committer.class","org.apache.spark.sql.parquet.DirectParquetOutputCommitter")
    sc.hadoopConfiguration.set("spark.sql.hive.convertMetastoreParquet","false")
    var sqlCtx = new SQLContext(sc)


    val DataFile = sc.textFile("s3a://my-bucket/archive/*/file*.gz")

    //Setup header table/df
    val header_rec = DataFile.map(_.split("\\|")).filter(x=> x(0) == "1")
    val headerSchemaDef = "market_no,rel_date,field1, field2, field3....."
    val headerSchema = StructType(headerSchemaDef.split(",").map(fieldName => StructField(fieldName, StringType,false)))
    val headerRecords = header_rec.map(p => Row(p(3), p(8), p(1), p(2), p(4), p(5), p(6) ))
    val header = sqlCtx.createDataFrame(headerRecords, headerSchema)
    header.registerTempTable("header")
    sqlCtx.cacheTable("header")


    //Setup fact table/df
    val fact_recs = DataFile.map(_.split("\\|")).filter(x=> x(0) == "2")
    val factSchemaDef = "market_no,rel_date,field1, field2, field3....."
    val factSchema = StructType(factSchemaDef.split(",").map(fieldName => StructField(fieldName, StringType,false)))
    val records = fact_recs.map(p => Row(p(11), p(12), p(1), p(2), p(3), p(4), p(5), p(6), p(7), p(8), p(9), p(10)))
    val df = sqlCtx.createDataFrame(records, factSchema)
    df.registerTempTable("fact")

    val results = sqlCtx.sql("select fact.* from header inner join fact on fact.market_no = header.market_no and fact.rel_date = header.rel_date")


    println(results.count())



    results.coalesce(1).write.mode(SaveMode.Overwrite).parquet("s3a://my-bucket/a/joined_data.parquet")


  }

카운트는 465884512 행에 대한 이분 정도 걸립니다. 마루에 쓰기 38 분 소요

나는 유착이 쓰기를 수행하는 드라이버에 셔플을한다는 것을 이해 ....하지만 시간은의 복용은 제가 심각하게 뭔가 잘못하고 있다고 생각하고 있습니다. 유착없이이 아직이 IMO 너무 오래 아직 15 분, 소요 나에게 작은 마루 파일의 톤을 제공합니다. 내가해야하는 데이터의 하루에 하나 개의 큰 파일을 가지고 싶습니다. 나뿐만 아니라 필드 값에 의해 분할을 할 수있는 코드를 가지고 있고, 그냥 천천히 같다. 또한이 CSV로 출력을 시도하고 그 ~ 1 시간 소요됩니다.

내 작업을 제출있을 때 또한, 나는 정말 런타임 소품을 설정하고 있지 않다. 하나 개의 작업에 대한 나의 콘솔 통계는 다음과 같습니다

해결법

  1. ==============================

    1.스파크 기본값 S3에 쓰기 특히, I / O 작업을 수행하는 동안 (아마도) 불필요한 오버 헤드 많은 양의 원인이됩니다. 이 문서에서는 더 철저하게이 문제를 논의,하지만 당신은 변경을 고려하는 것이 좋습니다 2 설정이 있습니다.

    스파크 기본값 S3에 쓰기 특히, I / O 작업을 수행하는 동안 (아마도) 불필요한 오버 헤드 많은 양의 원인이됩니다. 이 문서에서는 더 철저하게이 문제를 논의,하지만 당신은 변경을 고려하는 것이 좋습니다 2 설정이 있습니다.

  2. ==============================

    2.직접 출사 커미터는 스파크 코드베이스로부터 사라지고; 당신은 당신의 자신의 JAR에서 삭제 된 코드 부활 자신의 / 쓰기한다. 당신이 그렇게 할 경우, 당신의 일에 떨어져 투기를 설정 및 기타 장애도 문제가 "잘못된 데이터가"여기서 문제가 발생할 수 있다는 것을 알고.

    직접 출사 커미터는 스파크 코드베이스로부터 사라지고; 당신은 당신의 자신의 JAR에서 삭제 된 코드 부활 자신의 / 쓰기한다. 당신이 그렇게 할 경우, 당신의 일에 떨어져 투기를 설정 및 기타 장애도 문제가 "잘못된 데이터가"여기서 문제가 발생할 수 있다는 것을 알고.

    밝은 노트에서 하둡 2.8은 최적화 된 바이너리 형식 S3 오프 (ORC, 마루)을 읽기위한 몇 가지 S3A의 속도 향상을 추가 할 것입니다; 자세한 내용은 하둡-11694를 참조하십시오. 그리고 어떤 사람들은 (1) 작업의 마지막에 커밋 강력한 O를 할 수 있어야 일관된 메타 데이터 저장소에 대해 아마존 디나모 사용에 노력하고 있습니다.

  3. ==============================

    3.나는 또한이 문제를 가지고 있었다. 나머지는 말에서 추가, 여기 AWS에서 완전한 설명은 다음과 같습니다 https://aws.amazon.com/blogs/big-data/improve-apache-spark-write-performance-on-apache-parquet-formats-with -THE-emrfs는-S3 최적화 - 커미터 /

    나는 또한이 문제를 가지고 있었다. 나머지는 말에서 추가, 여기 AWS에서 완전한 설명은 다음과 같습니다 https://aws.amazon.com/blogs/big-data/improve-apache-spark-write-performance-on-apache-parquet-formats-with -THE-emrfs는-S3 최적화 - 커미터 /

    단지 (V1)에서 FileOutCommiter v2로 변경 내 실험을하는 동안 쓰기 3-4 배 향상.

    self.sc._jsc.hadoopConfiguration().set("mapreduce.fileoutputcommitter.algorithm.version", "2")
    
  4. ==============================

    4.S3에 스파크 쓰기 속도를 즉시 접근 방법 중 하나는 EMRFS S3에 최적화 된 커미터를 사용하는 것입니다.

    S3에 스파크 쓰기 속도를 즉시 접근 방법 중 하나는 EMRFS S3에 최적화 된 커미터를 사용하는 것입니다.

    이 커미터 S3A 사용하는 경우에는 사용할 수 없습니다 :

    나는 AWS EMR 5.26에이 차이를 테스트 및 S3를 사용했습니다 : // 15 % -30 % 빠른 S3A 이상이었다 : // (하지만 여전히 느린).

    이러한 복사 / 쓰기를 달성하기 내가 관리했습니다 가장 빠른 방법은 S3에 복사 s3distcp를 사용하여 다음 로컬 HDFS에 마루를 작성하는 것이 었습니다; 하나 개의 특정 시나리오 (작은 파일의 수백)이 5 배 배 빠른 S3에 직접 마루에 DataFrame을 작성하는 것보다했다.

  5. from https://stackoverflow.com/questions/36927918/using-spark-to-write-a-parquet-file-to-s3-over-s3a-is-very-slow by cc-by-sa and MIT license