[SCALA] 많은 작은 파일을 작성 dataframe 쓰기 방법을 불꽃
SCALA많은 작은 파일을 작성 dataframe 쓰기 방법을 불꽃
나는 마루하기 위해 로그 파일을 coverting 아주 간단한 일이있어. 약 12,000 파일입니다, 그것은 (우리의 블록 크기가 128메가바이트입니다 - - 128 메가 바이트 파일 64메가바이트로 청크) 데이터의 1.1TB를 처리합니다.
다음과 같이 작업 작동 :
val events = spark.sparkContext
.textFile(s"$stream/$sourcetype")
.map(_.split(" \\|\\| ").toList)
.collect{case List(date, y, "Event") => MyEvent(date, y, "Event")}
.toDF()
df.write.mode(SaveMode.Append).partitionBy("date").parquet(s"$path")
그것은 일반적인 스키마와 이벤트를 수집하는 DataFrame로 변환 한 다음 마루로 기록합니다.
제가하는 데 문제는 너무 많은 작은 파일을 생성하려고로이는 HDFS 클러스터에 IO 폭발의 비트를 만들 수 있다는 것입니다.
이상적으로는 파티션 '날짜'내 나무 마루 파일의 소수를 만들려고합니다.
이를 제어하는 가장 좋은 방법이 있을까요? 그것은 '유착을 ()'를 사용하여인가?
어떻게 그런 효과 것이다 주어진 파티션에 생성 된 파일의 양? 내가이 불꽃에서 일하고 얼마나 많은 집행에 따라인가? (현재는 100로 설정).
해결법
-
==============================
1.당신은 DataFrameWriter의 분할을 일치하도록 DataFrame을 repartiton해야
당신은 DataFrameWriter의 분할을 일치하도록 DataFrame을 repartiton해야
이 시도:
df .repartition($"date") .write.mode(SaveMode.Append) .partitionBy("date") .parquet(s"$path")
-
==============================
2.가장 간단한 솔루션으로 실제 파티션을 대체하는 것입니다 :
가장 간단한 솔루션으로 실제 파티션을 대체하는 것입니다 :
df .repartition(to_date($"date")) .write.mode(SaveMode.Append) .partitionBy("date") .parquet(s"$path")
또한 시간 범위의 하루 어쩌면 시간 즉 당신의 DataFrame에 대한보다 정확한 분할을 사용할 수 있습니다. 다음은 작가 덜 정확한 될 수 있습니다. 즉 실제로 데이터의 양에 따라 달라집니다.
당신은 DataFrame 및 조항에 의해 파티션 쓰기를 분할하여 엔트로피를 감소시킬 수있다.
-
==============================
3.파이썬에서는 라파엘의 로스의 대답으로 다시 작성할 수 있습니다 :
파이썬에서는 라파엘의 로스의 대답으로 다시 작성할 수 있습니다 :
(df .repartition("date") .write.mode("append") .partitionBy("date") .parquet("{path}".format(path=path)))
당신은 또한 매우 큰 파티션이있는 문제를 방지하기 위해 .repartition 더 많은 열을 추가하는 것이 좋습니다 :
(df .repartition("date", another_column, yet_another_colum) .write.mode("append") .partitionBy("date) .parquet("{path}".format(path=path)))
-
==============================
4.저도 같은 문제를 가로 질러 와서 나는 유착 내 문제를 해결하여 할 수있다.
저도 같은 문제를 가로 질러 와서 나는 유착 내 문제를 해결하여 할 수있다.
df .coalesce(3) // number of parts/files .write.mode(SaveMode.Append) .parquet(s"$path")
병합 또는 재분할 자세한 유착하거나 다음 스파크를 참조 할 수 있습니다 다시 분할 사용에 대한 자세한 내용은
-
==============================
5.여기에서 내 대답을 복제 : https://stackoverflow.com/a/53620268/171916
여기에서 내 대답을 복제 : https://stackoverflow.com/a/53620268/171916
이것은 아주 잘 나를 위해 노력하고 있습니다 :
data.repartition(n, "key").write.partitionBy("key").parquet("/location")
그것은 (다시, 일화, 내 데이터 세트에) 빠른 출력 만에 다시 분할보다는 각 출력 파티션 (디렉토리)에서 N 파일을 생성, 빠른 병합 사용하는 것보다 (일화)입니다.
당신이 S3와 함께 작업하는 경우, 또한 (스파크 쓰기 아웃시 파일 생성 / 이름 변경 / 삭제를 많이 수행)하고 모든 정착 사용 하둡 FileUtil (또는 단지 AWS의 CLI는) 모든 것을 복사 할 된 후에는 로컬 드라이브에 최선을 다하고 추천 위에:
import java.net.URI import org.apache.hadoop.fs.{FileSystem, FileUtil, Path} // ... def copy( in : String, out : String, sparkSession: SparkSession ) = { FileUtil.copy( FileSystem.get(new URI(in), sparkSession.sparkContext.hadoopConfiguration), new Path(in), FileSystem.get(new URI(out), sparkSession.sparkContext.hadoopConfiguration), new Path(out), false, sparkSession.sparkContext.hadoopConfiguration ) }
-
==============================
6.방법 하나에 모든 마루 파일을 통합지도 작업으로 다음과 같은 스크립트 실행을 시도에 대해 :
방법 하나에 모든 마루 파일을 통합지도 작업으로 다음과 같은 스크립트 실행을 시도에 대해 :
$의 하둡 항아리 \ /usr/hdp/2.3.2.0-2950/hadoop-mapreduce/hadoop-streaming-2.7.1.2.3.2.0-2950.jar -Dmapred.reduce.tasks = 1 \ -input "/ HDFS / 입 / DIR"\ - 출력 "/ HDFS / 출력 / DIR"\ -mapper 고양이 \ -reducer 고양이
from https://stackoverflow.com/questions/44459355/spark-dataframe-write-method-writing-many-small-files by cc-by-sa and MIT license
'SCALA' 카테고리의 다른 글
[SCALA] 스파크에 타임 스탬프로 문자열 필드를 변환하는 더 나은 방법 (0) | 2019.11.05 |
---|---|
[SCALA] java.lang.NoClassDefFoundError가 : 조직 / 아파치 / 스파크 / 스트리밍 / 트위터 / TwitterUtils $ TwitterPopularTags을 실행하는 동안 (0) | 2019.11.05 |
[SCALA] 어떤 종류의 스칼라에서 메모리 가변 데이터 테이블을 저장하기 위해 사용하는 방법? (0) | 2019.11.05 |
[SCALA] 대신 필터 withFilter (0) | 2019.11.05 |
[SCALA] 스칼라의 Iterable의 상위 N 요소를 얻을 수있는 간단한 방법 (0) | 2019.11.05 |