복붙노트

[SCALA] 스파크 : 단일 키 RDD와 2 튜플 키 RDD 가입을위한 최선의 전략은 무엇입니까?

SCALA

스파크 : 단일 키 RDD와 2 튜플 키 RDD 가입을위한 최선의 전략은 무엇입니까?

내가 가입하려는 두 RDD 년대를하고는 다음과 같습니다 :

val rdd1:RDD[(T,U)]
val rdd2:RDD[((T,W), V)]

rdd1의 키 값이 rdd2의 튜플 키 값이 고유한지도 독특하고 있다는 경우가 발생합니다. 나는 다음과 같은 RDD를 얻을 수 있도록 두 개의 데이터 세트에 참여하고 싶습니다 :

val rdd_joined:RDD[((T,W), (U,V))]

이를 달성하기위한 가장 효율적인 방법은 무엇입니까? 여기에 내가 생각 한 몇 가지 아이디어가 있습니다.

옵션 1:

val m = rdd1.collectAsMap
val rdd_joined = rdd2.map({case ((t,w), u) => ((t,w), u, m.get(t))})

옵션 2 :

val distinct_w = rdd2.map({case ((t,w), u) => w}).distinct
val rdd_joined = rdd1.cartesian(distinct_w).join(rdd2)

옵션 1은 오른쪽 마스터 모든 데이터를 수집 할 것인가? rdd1가 큰 경우 그래서 (이, 내 경우에는 상대적으로 큰의 rdd2보다 작은 크기의 순서하지만) 좋은 방법은 아닌 것 같아. 옵션 2도 매우 비효율적 인 것 같습니다 못생긴 별개의 직교 제품을한다. 이지도의 키가에 위치한 공동되도록 "스마트"방식으로 방송 좋을 것이다하지만 내 마음을 교차 (아직 시도하지 않은) 또 다른 가능성은, 옵션 1을하고지도를 방송하는 것입니다 rdd2의 키.

사람이 전에 상황이 이런 종류의 건너 했습니까? 나는 당신의 생각을 드리겠습니다.

감사!

해결법

  1. ==============================

    1.하나의 옵션은 브로드 캐스트 드라이버에 rdd1를 수집하고 모든 매퍼에 방송에 의해 가입 수행하는 것입니다; 제대로, 이것은 우리가 큰 rdd2의 RDD의 비싼 셔플을 피할 수있게된다 :

    하나의 옵션은 브로드 캐스트 드라이버에 rdd1를 수집하고 모든 매퍼에 방송에 의해 가입 수행하는 것입니다; 제대로, 이것은 우리가 큰 rdd2의 RDD의 비싼 셔플을 피할 수있게된다 :

    val rdd1 = sc.parallelize(Seq((1, "A"), (2, "B"), (3, "C")))
    val rdd2 = sc.parallelize(Seq(((1, "Z"), 111), ((1, "ZZ"), 111), ((2, "Y"), 222), ((3, "X"), 333)))
    
    val rdd1Broadcast = sc.broadcast(rdd1.collectAsMap())
    val joined = rdd2.mapPartitions({ iter =>
      val m = rdd1Broadcast.value
      for {
        ((t, w), u) <- iter
        if m.contains(t)
      } yield ((t, w), (u, m.get(t).get))
    }, preservesPartitioning = true)
    

    preservesPartitioning = 사실이지도 기능을 rdd2의 키를 수정하지 않는 스파크 말한다; 이렇게 점화가 (t, w) 키에 기초 조인 후속 작업 재분할 rdd2을 방지 할 수 있습니다.

    이 드라이버에서 통신 병목 현상을 수반하기 때문에이 방송은 비효율적 일 수있다. 원칙적으로, 운전자를 포함하지 않고 다른 한 RDD를 방송하는 것이 가능하다; 내가 일반화 스파크에 추가하고 싶은이의 프로토 타입이있다.

    또 다른 옵션은 rdd2의 열쇠를 다시 매핑과 불꽃이 방법을 조인을 사용하는 것입니다; 이 전체 rdd2의 셔플 (그리고 아마도 rdd1)를 포함 할 것이다 :

    rdd1.join(rdd2.map {
      case ((t, w), u) => (t, (w, u))
    }).map {
      case (t, (v, (w, u))) => ((t, w), (u, v))
    }.collect()
    

    내 샘플 입력에서 이러한 방법 모두 동일한 결과를 생성 :

    res1: Array[((Int, java.lang.String), (Int, java.lang.String))] = Array(((1,Z),(111,A)), ((1,ZZ),(111,A)), ((2,Y),(222,B)), ((3,X),(333,C)))
    

    세 번째 옵션은 t이 핵심 그래서 다음 위의 조인을 수행 rdd2을 구조 조정하는 것입니다.

  2. ==============================

    2.그것을 할 수있는 또 다른 방법은 사용자 정의 파티션 프로그램을 만들고 다음 RDDs 가입 zipPartitions을 사용하는 것입니다.

    그것을 할 수있는 또 다른 방법은 사용자 정의 파티션 프로그램을 만들고 다음 RDDs 가입 zipPartitions을 사용하는 것입니다.

    import org.apache.spark.HashPartitioner
    
    class RDD2Partitioner(partitions: Int) extends HashPartitioner(partitions) {
    
      override def getPartition(key: Any): Int = key match {
        case k: Tuple2[Int, String] => super.getPartition(k._1)
        case _ => super.getPartition(key)
      }
    
    }
    
    val numSplits = 8
    val rdd1 = sc.parallelize(Seq((1, "A"), (2, "B"), (3, "C"))).partitionBy(new HashPartitioner(numSplits))
    val rdd2 = sc.parallelize(Seq(((1, "Z"), 111), ((1, "ZZ"), 111), ((1, "AA"), 123), ((2, "Y"), 222), ((3, "X"), 333))).partitionBy(new RDD2Partitioner(numSplits))
    
    val result = rdd2.zipPartitions(rdd1)(
      (iter2, iter1) => {
        val m = iter1.toMap
        for {
            ((t: Int, w), u) <- iter2
            if m.contains(t)
          } yield ((t, w), (u, m.get(t).get))
      }
    ).partitionBy(new HashPartitioner(numSplits))
    
    result.glom.collect
    
  3. from https://stackoverflow.com/questions/17621596/spark-whats-the-best-strategy-for-joining-a-2-tuple-key-rdd-with-single-key-rd by cc-by-sa and MIT license