복붙노트

[SCALA] 많은 작은 파일을 작성 dataframe 쓰기 방법을 불꽃

SCALA

많은 작은 파일을 작성 dataframe 쓰기 방법을 불꽃

나는 마루하기 위해 로그 파일을 coverting 아주 간단한 일이있어. 약 12,000 파일입니다, 그것은 (우리의 블록 크기가 128메가바이트입니다 - - 128 메가 바이트 파일 64메가바이트로 청크) 데이터의 1.1TB를 처리합니다.

다음과 같이 작업 작동 :

 val events = spark.sparkContext
  .textFile(s"$stream/$sourcetype")
  .map(_.split(" \\|\\| ").toList)
  .collect{case List(date, y, "Event") => MyEvent(date, y, "Event")}
  .toDF()

df.write.mode(SaveMode.Append).partitionBy("date").parquet(s"$path")

그것은 일반적인 스키마와 이벤트를 수집하는 DataFrame로 변환 한 다음 마루로 기록합니다.

제가하는 데 문제는 너무 많은 작은 파일을 생성하려고로이는 HDFS 클러스터에 IO 폭발의 비트를 만들 수 있다는 것입니다.

이상적으로는 파티션 '날짜'내 나무 마루 파일의 소수를 만들려고합니다.

이를 제어하는 ​​가장 좋은 방법이 있을까요? 그것은 '유착을 ()'를 사용하여인가?

어떻게 그런 효과 것이다 주어진 파티션에 생성 된 파일의 양? 내가이 불꽃에서 일하고 얼마나 많은 집행에 따라인가? (현재는 100로 설정).

해결법

  1. ==============================

    1.당신은 DataFrameWriter의 분할을 일치하도록 DataFrame을 repartiton해야

    당신은 DataFrameWriter의 분할을 일치하도록 DataFrame을 repartiton해야

    이 시도:

    df
    .repartition($"date")
    .write.mode(SaveMode.Append)
    .partitionBy("date")
    .parquet(s"$path")
    
  2. ==============================

    2.가장 간단한 솔루션으로 실제 파티션을 대체하는 것입니다 :

    가장 간단한 솔루션으로 실제 파티션을 대체하는 것입니다 :

    df
     .repartition(to_date($"date"))
     .write.mode(SaveMode.Append)
     .partitionBy("date")
     .parquet(s"$path")
    

    또한 시간 범위의 하루 어쩌면 시간 즉 당신의 DataFrame에 대한보다 정확한 분할을 사용할 수 있습니다. 다음은 작가 덜 정확한 될 수 있습니다. 즉 실제로 데이터의 양에 따라 달라집니다.

    당신은 DataFrame 및 조항에 의해 파티션 쓰기를 분할하여 엔트로피를 감소시킬 수있다.

  3. ==============================

    3.파이썬에서는 라파엘의 로스의 대답으로 다시 작성할 수 있습니다 :

    파이썬에서는 라파엘의 로스의 대답으로 다시 작성할 수 있습니다 :

    (df
      .repartition("date")
      .write.mode("append")
      .partitionBy("date")
      .parquet("{path}".format(path=path)))
    

    당신은 또한 매우 큰 파티션이있는 문제를 방지하기 위해 .repartition 더 많은 열을 추가하는 것이 좋습니다 :

    (df
      .repartition("date", another_column, yet_another_colum)
      .write.mode("append")
      .partitionBy("date)
      .parquet("{path}".format(path=path)))
    
  4. ==============================

    4.저도 같은 문제를 가로 질러 와서 나는 유착 내 문제를 해결하여 할 수있다.

    저도 같은 문제를 가로 질러 와서 나는 유착 내 문제를 해결하여 할 수있다.

    df
      .coalesce(3) // number of parts/files 
      .write.mode(SaveMode.Append)
      .parquet(s"$path")
    

    병합 또는 재분할 자세한 유착하거나 다음 스파크를 참조 할 수 있습니다 다시 분할 사용에 대한 자세한 내용은

  5. ==============================

    5.여기에서 내 대답을 복제 : https://stackoverflow.com/a/53620268/171916

    여기에서 내 대답을 복제 : https://stackoverflow.com/a/53620268/171916

    이것은 아주 잘 나를 위해 노력하고 있습니다 :

    data.repartition(n, "key").write.partitionBy("key").parquet("/location")
    

    그것은 (다시, 일화, 내 데이터 세트에) 빠른 출력 만에 다시 분할보다는 각 출력 파티션 (디렉토리)에서 N 파일을 생성, 빠른 병합 사용하는 것보다 (일화)입니다.

    당신이 S3와 함께 작업하는 경우, 또한 (스파크 쓰기 아웃시 파일 생성 / 이름 변경 / 삭제를 많이 수행)하고 모든 정착 사용 하둡 FileUtil (또는 단지 AWS의 CLI는) 모든 것을 복사 할 된 후에는 로컬 드라이브에 최선을 다하고 추천 위에:

    import java.net.URI
    import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
    // ...
      def copy(
              in : String,
              out : String,
              sparkSession: SparkSession
              ) = {
        FileUtil.copy(
          FileSystem.get(new URI(in), sparkSession.sparkContext.hadoopConfiguration),
          new Path(in),
          FileSystem.get(new URI(out), sparkSession.sparkContext.hadoopConfiguration),
          new Path(out),
          false,
          sparkSession.sparkContext.hadoopConfiguration
        )
      }
    
  6. ==============================

    6.방법 하나에 모든 마루 파일을 통합지도 작업으로 다음과 같은 스크립트 실행을 시도에 대해 :

    방법 하나에 모든 마루 파일을 통합지도 작업으로 다음과 같은 스크립트 실행을 시도에 대해 :

    $의 하둡 항아리 \ /usr/hdp/2.3.2.0-2950/hadoop-mapreduce/hadoop-streaming-2.7.1.2.3.2.0-2950.jar                    -Dmapred.reduce.tasks = 1 \                    -input "/ HDFS / 입 / DIR"\                    - 출력 "/ HDFS / 출력 / DIR"\                    -mapper 고양이 \                    -reducer 고양이

  7. from https://stackoverflow.com/questions/44459355/spark-dataframe-write-method-writing-many-small-files by cc-by-sa and MIT license