[HADOOP] HDFS에서 테이블을 미리 그룹화하고 제로 셔플 링으로 스파크에서 읽기
HADOOPHDFS에서 테이블을 미리 그룹화하고 제로 셔플 링으로 스파크에서 읽기
나는 스파크 일자리의 일환으로 내가 가입 / 코 그룹화하는 두 개의 테이블을 가지고 있는데, 이는 내가 일을 할 때마다 커다란 셔플이 발생한다. 정리 된 데이터를 한 번 저장하여 모든 작업에서 비용을 상환하고 일반 스파크의 일부로 이미 그룹화 된 데이터를 사용하여 셔플을 피하려고합니다.
이것을 시도하기 위해, 저는 마루판 형식으로 저장된 HDFS의 일부 데이터를 가지고 있습니다. Parquet 반복 필드를 사용하여 다음 스키마를 얻습니다.
여기서 [a 레코드]는 a 레코드의 배열을 나타냅니다. 나는 또한 일반적인 write.partitionBy ($ "date")를 사용하여 HDFS에서 날짜별로 데이터를 분할하고있다.
이 경우 레코드 및 레코드가 날짜별로 효과적으로 그룹화 된 것처럼 보입니다. 다음과 같은 작업을 수행 할 수 있습니다.
case class CogroupedData(date: Date, aRecords: Array[Int], bRecords: Array[Int])
val cogroupedData = spark.read.parquet("path/to/data").as[CogroupedData]
//Dataset[(Date,Int)] where the Int in the two sides multiplied
val results = cogroupedData
.flatMap(el => el.aRecords.zip(el.bRecords).map(pair => (el.date, pair._1 * pair._2)))
그리고 aRecords와 bRecords에 대해 두 개의 별개의 테이블에서 동등한 groupByKey 연산을 사용하여 얻은 결과를 날짜별로 입력하십시오.
두 가지의 차이점은 이미 그룹화 된 데이터와의 혼란을 피하는 것입니다. 그룹화 된 비용은 HDFS에서 유지함으로써 상각됩니다.
이제 질문입니다. Cogrouped 데이터 세트에서 두 개의 그룹화 된 데이터 세트를 파생 시키므로 표준 스파크 SQL 연산자 (cogroup, join 등)를 사용할 수 있습니다. 이것은 첫 번째 코드 예제가 작동하기 때문에 가능하지만 Spark은 / groupByKey / cogroup 등에 가입 할 때 데이터 해싱 / 셔플 링을 계속 주장합니다.
아래 코드 샘플을 가져 가십시오. 조인을 수행 할 때 셔플을하지 않고 아래를 실행할 수있는 방법이있을 것으로 기대합니다.
val cogroupedData = spark.read.parquet("path/to/data").as[CogroupedData]
val aRecords = cogroupedData
.flatMap(cog => cog.aRecords.map(a => (cog.date,a)))
val bRecords = cogroupedData
.flatMap(cog => cog.bRecords.map(b => (cog.date,b)))
val joined = aRecords.join(bRecords,Seq("date"))
문헌을 보면 cogroupedData에 알려진 파 티셔가있는 경우 RDD가 이미 파티셔닝되었다는 사실을 사용하고 파티셔너를 보존 할 수 있으므로 뒤 따르는 작업은 셔플이 발생하지 않아야합니다.
이를 달성하기 위해 필요한 것은 셔플을 발생시키지 않고 알려진 파 티셔 터를 사용하여 cogroupedData Dataset / rdd를 얻는 것입니다.
이미 시도한 다른 것들 :
누구든지 아이디어가 있으십니까?
해결법
-
==============================
1.두 가지 실수를 저질렀습니다.
두 가지 실수를 저질렀습니다.
그것을하는 올바른 방법은 다음과 같습니다.
from https://stackoverflow.com/questions/52006635/pre-cogrouping-tables-on-hdfs-and-reading-in-spark-with-zero-shuffling by cc-by-sa and MIT license
'HADOOP' 카테고리의 다른 글
[HADOOP] Hadoop mapReduce HDFS에 값만 저장하는 방법 (0) | 2019.07.20 |
---|---|
[HADOOP] 외부 hadoop 클러스터 인 Spark에서 H / A namenodes로 구성된 URI로 hdfs에 액세스하는 방법? (0) | 2019.07.20 |
[HADOOP] 별거없이 sqoop을 실행할 수 있습니까? (0) | 2019.07.19 |
[HADOOP] Hadoop MapReduce 작업에서 다중 이퀄라이저 체인화 (0) | 2019.07.19 |
[HADOOP] R이 Rscript로 실행 중인지 어떻게 테스트합니까? (0) | 2019.07.19 |