[SCALA] 스파크 : 단일 키 RDD와 2 튜플 키 RDD 가입을위한 최선의 전략은 무엇입니까?
SCALA스파크 : 단일 키 RDD와 2 튜플 키 RDD 가입을위한 최선의 전략은 무엇입니까?
내가 가입하려는 두 RDD 년대를하고는 다음과 같습니다 :
val rdd1:RDD[(T,U)]
val rdd2:RDD[((T,W), V)]
rdd1의 키 값이 rdd2의 튜플 키 값이 고유한지도 독특하고 있다는 경우가 발생합니다. 나는 다음과 같은 RDD를 얻을 수 있도록 두 개의 데이터 세트에 참여하고 싶습니다 :
val rdd_joined:RDD[((T,W), (U,V))]
이를 달성하기위한 가장 효율적인 방법은 무엇입니까? 여기에 내가 생각 한 몇 가지 아이디어가 있습니다.
옵션 1:
val m = rdd1.collectAsMap
val rdd_joined = rdd2.map({case ((t,w), u) => ((t,w), u, m.get(t))})
옵션 2 :
val distinct_w = rdd2.map({case ((t,w), u) => w}).distinct
val rdd_joined = rdd1.cartesian(distinct_w).join(rdd2)
옵션 1은 오른쪽 마스터 모든 데이터를 수집 할 것인가? rdd1가 큰 경우 그래서 (이, 내 경우에는 상대적으로 큰의 rdd2보다 작은 크기의 순서하지만) 좋은 방법은 아닌 것 같아. 옵션 2도 매우 비효율적 인 것 같습니다 못생긴 별개의 직교 제품을한다. 이지도의 키가에 위치한 공동되도록 "스마트"방식으로 방송 좋을 것이다하지만 내 마음을 교차 (아직 시도하지 않은) 또 다른 가능성은, 옵션 1을하고지도를 방송하는 것입니다 rdd2의 키.
사람이 전에 상황이 이런 종류의 건너 했습니까? 나는 당신의 생각을 드리겠습니다.
감사!
해결법
-
==============================
1.하나의 옵션은 브로드 캐스트 드라이버에 rdd1를 수집하고 모든 매퍼에 방송에 의해 가입 수행하는 것입니다; 제대로, 이것은 우리가 큰 rdd2의 RDD의 비싼 셔플을 피할 수있게된다 :
하나의 옵션은 브로드 캐스트 드라이버에 rdd1를 수집하고 모든 매퍼에 방송에 의해 가입 수행하는 것입니다; 제대로, 이것은 우리가 큰 rdd2의 RDD의 비싼 셔플을 피할 수있게된다 :
val rdd1 = sc.parallelize(Seq((1, "A"), (2, "B"), (3, "C"))) val rdd2 = sc.parallelize(Seq(((1, "Z"), 111), ((1, "ZZ"), 111), ((2, "Y"), 222), ((3, "X"), 333))) val rdd1Broadcast = sc.broadcast(rdd1.collectAsMap()) val joined = rdd2.mapPartitions({ iter => val m = rdd1Broadcast.value for { ((t, w), u) <- iter if m.contains(t) } yield ((t, w), (u, m.get(t).get)) }, preservesPartitioning = true)
preservesPartitioning = 사실이지도 기능을 rdd2의 키를 수정하지 않는 스파크 말한다; 이렇게 점화가 (t, w) 키에 기초 조인 후속 작업 재분할 rdd2을 방지 할 수 있습니다.
이 드라이버에서 통신 병목 현상을 수반하기 때문에이 방송은 비효율적 일 수있다. 원칙적으로, 운전자를 포함하지 않고 다른 한 RDD를 방송하는 것이 가능하다; 내가 일반화 스파크에 추가하고 싶은이의 프로토 타입이있다.
또 다른 옵션은 rdd2의 열쇠를 다시 매핑과 불꽃이 방법을 조인을 사용하는 것입니다; 이 전체 rdd2의 셔플 (그리고 아마도 rdd1)를 포함 할 것이다 :
rdd1.join(rdd2.map { case ((t, w), u) => (t, (w, u)) }).map { case (t, (v, (w, u))) => ((t, w), (u, v)) }.collect()
내 샘플 입력에서 이러한 방법 모두 동일한 결과를 생성 :
res1: Array[((Int, java.lang.String), (Int, java.lang.String))] = Array(((1,Z),(111,A)), ((1,ZZ),(111,A)), ((2,Y),(222,B)), ((3,X),(333,C)))
세 번째 옵션은 t이 핵심 그래서 다음 위의 조인을 수행 rdd2을 구조 조정하는 것입니다.
-
==============================
2.그것을 할 수있는 또 다른 방법은 사용자 정의 파티션 프로그램을 만들고 다음 RDDs 가입 zipPartitions을 사용하는 것입니다.
그것을 할 수있는 또 다른 방법은 사용자 정의 파티션 프로그램을 만들고 다음 RDDs 가입 zipPartitions을 사용하는 것입니다.
import org.apache.spark.HashPartitioner class RDD2Partitioner(partitions: Int) extends HashPartitioner(partitions) { override def getPartition(key: Any): Int = key match { case k: Tuple2[Int, String] => super.getPartition(k._1) case _ => super.getPartition(key) } } val numSplits = 8 val rdd1 = sc.parallelize(Seq((1, "A"), (2, "B"), (3, "C"))).partitionBy(new HashPartitioner(numSplits)) val rdd2 = sc.parallelize(Seq(((1, "Z"), 111), ((1, "ZZ"), 111), ((1, "AA"), 123), ((2, "Y"), 222), ((3, "X"), 333))).partitionBy(new RDD2Partitioner(numSplits)) val result = rdd2.zipPartitions(rdd1)( (iter2, iter1) => { val m = iter1.toMap for { ((t: Int, w), u) <- iter2 if m.contains(t) } yield ((t, w), (u, m.get(t).get)) } ).partitionBy(new HashPartitioner(numSplits)) result.glom.collect
from https://stackoverflow.com/questions/17621596/spark-whats-the-best-strategy-for-joining-a-2-tuple-key-rdd-with-single-key-rd by cc-by-sa and MIT license
'SCALA' 카테고리의 다른 글
[SCALA] _ * 스칼라의 의미 : 무슨 일이 PARAM은 무엇입니까? (0) | 2019.11.03 |
---|---|
[SCALA] 스칼라에 대한 JVM 힙 크기를 증가? (0) | 2019.11.03 |
[SCALA] 상관, AnyVal는, AnyRef는, 객체와 자바 코드에서 사용될 경우 그들이 어떻게지도 할 사이의 관계는 무엇인가? (0) | 2019.11.03 |
[SCALA] 자기 형 주석이 자기의 차이? (0) | 2019.11.02 |
[SCALA] 임의의 스칼라 코드의 위치 동안 통역에 드롭 (0) | 2019.11.02 |