복붙노트

[SCALA] HashPartitioner는 어떻게 작동합니까?

SCALA

HashPartitioner는 어떻게 작동합니까?

나는 HashPartitioner의 문서를 읽어. 불행하게도 많은 것도 API 호출을 제외하고 설명되지 않았다. 나는 HashPartitioner이 키의 해시를 기반으로 분산 세트를 분할 가정하에입니다. 예를 들어 내 데이터가 같은 경우

(1,1), (1,2), (1,3), (2,1), (2,2), (2,3)

그래서 파티션 프로그램은 동일한 파티션에서 떨어지는 같은 키를 다른 파티션으로이 둘 것입니다. 그러나 나는 생성자 인수의 의미를 이해하지 못하는

new HashPartitoner(numPartitions) //What does numPartitions do?

내가 한 경우 위의 데이터 세트를 위해 어떻게 다른 결과가 나올 것입니다

new HashPartitoner(1)
new HashPartitoner(2)
new HashPartitoner(10)

어떻게 HashPartitioner 실제로 작동합니까?

해결법

  1. ==============================

    1.음, 데이터 집합이 소폭 더 재미있게 만들 수 있습니다 :

    음, 데이터 집합이 소폭 더 재미있게 만들 수 있습니다 :

    val rdd = sc.parallelize(for {
        x <- 1 to 3
        y <- 1 to 2
    } yield (x, None), 8)
    

    우리는 여섯 개 요소가 :

    rdd.count
    
    Long = 6
    

    어떤 파티션하지 :

    rdd.partitioner
    
    Option[org.apache.spark.Partitioner] = None
    

    여덟 개 파티션 :

    rdd.partitions.length
    
    Int = 8
    

    이제 파티션 당 요소의 수를 계산하는 작은 도우미를 정의 할 수 있습니다 :

    import org.apache.spark.rdd.RDD
    
    def countByPartition(rdd: RDD[(Int, None.type)]) = {
        rdd.mapPartitions(iter => Iterator(iter.length))
    }
    

    우리가 파티션 프로그램을 가지고 있지 않기 때문에 우리의 데이터 집합 (불꽃의 기본 파티션 나누기 계획) 파티션 사이에 균일하게 분포되어있다 :

    countByPartition(rdd).collect()
    
    Array[Int] = Array(0, 1, 1, 1, 0, 1, 1, 1)
    

    이제 우리의 데이터 집합을 다시 분할 할 수 있습니다 :

    import org.apache.spark.HashPartitioner
    val rddOneP = rdd.partitionBy(new HashPartitioner(1))
    

    HashPartitioner에 전달 된 매개 변수는 파티션의 수를 정의하기 때문에 우리는 하나 개의 파티션을 기대하고 있습니다 :

    rddOneP.partitions.length
    
    Int = 1
    

    우리가 하나 개의 파티션을 가지고 있기 때문에이 모든 요소를 ​​포함합니다 :

    countByPartition(rddOneP).collect
    
    Array[Int] = Array(6)
    

    셔플 후 값의 순서는 비 결정적이라고합니다.

    같은 방법으로 우리는 HashPartitioner를 사용하는 경우 (2)

    val rddTwoP = rdd.partitionBy(new HashPartitioner(2))
    

    우리는이 개 파티션을 얻을 것이다 :

    rddTwoP.partitions.length
    
    Int = 2
    

    RDD는 키 데이터에 의해 분할되어 있기 때문에 더 이상 균일하게 분포되지 않습니다 :

    countByPartition(rddTwoP).collect()
    
    Array[Int] = Array(2, 4)
    

    세 개의 키와 hashCode 모드 numPartitions의 두 개의 서로 다른 값을 함께하기 때문에 여기에 예상치 못한 아무것도 없다 :

    (1 to 3).map((k: Int) => (k, k.hashCode, k.hashCode % 2))
    
    scala.collection.immutable.IndexedSeq[(Int, Int, Int)] = Vector((1,1,1), (2,2,0), (3,3,1))
    

    그냥 위를 확인합니다 :

    rddTwoP.mapPartitions(iter => Iterator(iter.map(_._1).toSet)).collect()
    
    Array[scala.collection.immutable.Set[Int]] = Array(Set(2), Set(1, 3))
    

    마지막으로 HashPartitioner (7) 우리는 일곱 개 파티션을 얻을,이 개 요소가 비어 있지 않은 각 3 :

    val rddSevenP = rdd.partitionBy(new HashPartitioner(7))
    rddSevenP.partitions.length
    
    Int = 7
    
    countByPartition(rddTenP).collect()
    
    Array[Int] = Array(0, 2, 2, 2, 0, 0, 0)
    

  2. ==============================

    2.RDD이 그것이 일부 부품 수를 분할 의미 배포됩니다. 이 파티션 각각은 다른 시스템에서 잠재적이다. 인수 numPartitions와 해시 파티션 프로그램은 다음과 같은 방법으로 페어 (키, 값)를 배치 할 것을 파티션 선택 :

    RDD이 그것이 일부 부품 수를 분할 의미 배포됩니다. 이 파티션 각각은 다른 시스템에서 잠재적이다. 인수 numPartitions와 해시 파티션 프로그램은 다음과 같은 방법으로 페어 (키, 값)를 배치 할 것을 파티션 선택 :

  3. ==============================

    3.HashPartitioner.getPartition 방법은 인수로 키를 받아서 키가 속해있는 파티션의 인덱스를 반환합니다. 파티션 프로그램은 유효한 지표가 무엇인지 알고있다, 그래서 권리 범위에 숫자를 반환합니다. 파티션의 수는 numPartitions 생성자 인수를 통해 지정됩니다.

    HashPartitioner.getPartition 방법은 인수로 키를 받아서 키가 속해있는 파티션의 인덱스를 반환합니다. 파티션 프로그램은 유효한 지표가 무엇인지 알고있다, 그래서 권리 범위에 숫자를 반환합니다. 파티션의 수는 numPartitions 생성자 인수를 통해 지정됩니다.

    구현은 대략 key.hashCode () %의 numPartitions를 반환합니다. 자세한 내용은 Partitioner.scala를 참조하십시오.

  4. from https://stackoverflow.com/questions/31424396/how-does-hashpartitioner-work by cc-by-sa and MIT license