복붙노트

[HADOOP] 각 파티션의 요소 수가 같은 동일한 크기의 파티션으로 구성된 Spark RDD의 사용자 정의 파티션을 정의하는 방법은 무엇입니까?

HADOOP

각 파티션의 요소 수가 같은 동일한 크기의 파티션으로 구성된 Spark RDD의 사용자 정의 파티션을 정의하는 방법은 무엇입니까?

나는 스파크가 처음이다. 나는 [RDD] 요소의 큰 데이터 집합을 가지고 있으며 그것을 요소의 순서를 유지하면서 똑같은 크기의 두 개의 분할 영역으로 나누고 싶습니다. 나는 RangePartitioner를 사용하여

var data = partitionedFile.partitionBy(new RangePartitioner(2, partitionedFile))

이는 대략 동일하지만 크기가 동일하지 않은 요소의 유지 순서를 대략 나눌 수 있기 때문에 만족스러운 결과를 제공하지 못합니다. 예를 들어, 64 개의 요소가 있다면, 우리는 Rangepartitioner는 31 개의 요소와 33 개의 요소로 나뉩니다.

나는 처음 32 개 요소를 반으로, 나머지 절반은 32 개 요소를 두 번째로 포함하는 분할 자 (partitioner)가 필요합니다. 사용자 정의 파 티셔 터를 사용하여 요소의 순서를 유지하면서 동일한 크기의 두 반쪽을 사용하는 방법을 제안하여 도움을받을 수 있습니까?

해결법

  1. ==============================

    1.파티션은 파티션에 키를 할당하여 작동합니다. 그러한 분배자를 만들기 위해 키 배포에 대한 사전 지식이 필요하거나 모든 키를 살펴야합니다. 이것이 스파크가 당신에게 제공하지 않는 이유입니다.

    파티션은 파티션에 키를 할당하여 작동합니다. 그러한 분배자를 만들기 위해 키 배포에 대한 사전 지식이 필요하거나 모든 키를 살펴야합니다. 이것이 스파크가 당신에게 제공하지 않는 이유입니다.

    일반적으로 그런 분할자를 필요로하지 않습니다. 사실 같은 크기의 파티션이 필요한 유스 케이스를 만들 수 없습니다. 요소의 수가 이상한 경우에는 어떻게해야합니까?

    어쨌든, 순차적 인 Ints에 의해 키가 입력 된 RDD가 있고 합계가 몇 개인 지 알 수 있습니다. 그런 다음 아래와 같이 사용자 정의 Partitioner를 작성할 수 있습니다.

    class ExactPartitioner[V](
        partitions: Int,
        elements: Int)
      extends Partitioner {
    
      def getPartition(key: Any): Int = {
        val k = key.asInstanceOf[Int]
        // `k` is assumed to go continuously from 0 to elements-1.
        return k * partitions / elements
      }
    }
    
  2. ==============================

    2.이 대답은 Daniel로부터 영감을 얻었지만 사람들의 복사 및 붙여 넣기에 대한 예제가있는 전체 구현 (포주 라이브러리 패턴 사용)을 제공합니다. :)

    이 대답은 Daniel로부터 영감을 얻었지만 사람들의 복사 및 붙여 넣기에 대한 예제가있는 전체 구현 (포주 라이브러리 패턴 사용)을 제공합니다. :)

    import RDDConversions._
    
    trait RDDWrapper[T] {
      def rdd: RDD[T]
    }
    
    // TODO View bounds are deprecated, should use context bounds
    // Might need to change ClassManifest for ClassTag in spark 1.0.0
    case class RichPairRDD[K <% Ordered[K] : ClassManifest, V: ClassManifest](
      rdd: RDD[(K, V)]) extends RDDWrapper[(K, V)] {
      // Here we use a single Long to try to ensure the sort is balanced, 
      // but for really large dataset, we may want to consider
      // using a tuple of many Longs or even a GUID
      def sortByKeyGrouped(numPartitions: Int): RDD[(K, V)] =
        rdd.map(kv => ((kv._1, Random.nextLong()), kv._2)).sortByKey()
        .grouped(numPartitions).map(t => (t._1._1, t._2))
    }
    
    case class RichRDD[T: ClassManifest](rdd: RDD[T]) extends RDDWrapper[T] {
      def grouped(size: Int): RDD[T] = {
        // TODO Version where withIndex is cached
        val withIndex = rdd.mapPartitions(_.zipWithIndex)
    
        val startValues =
          withIndex.mapPartitionsWithIndex((i, iter) => 
            Iterator((i, iter.toIterable.last))).toArray().toList
          .sortBy(_._1).map(_._2._2.toLong).scan(-1L)(_ + _).map(_ + 1L)
    
        withIndex.mapPartitionsWithIndex((i, iter) => iter.map {
          case (value, index) => (startValues(i) + index.toLong, value)
        })
        .partitionBy(new Partitioner {
          def numPartitions: Int = size
          def getPartition(key: Any): Int = 
            (key.asInstanceOf[Long] * numPartitions.toLong / startValues.last).toInt
        })
        .map(_._2)
      }
    }
    

    그런 다음 다른 파일에서

    // TODO modify above to be implicit class, rather than have implicit conversions
    object RDDConversions {
      implicit def toRichRDD[T: ClassManifest](rdd: RDD[T]): RichRDD[T] = 
        new RichRDD[T](rdd)
      implicit def toRichPairRDD[K <% Ordered[K] : ClassManifest, V: ClassManifest](
        rdd: RDD[(K, V)]): RichPairRDD[K, V] = RichPairRDD(rdd)
      implicit def toRDD[T](rdd: RDDWrapper[T]): RDD[T] = rdd.rdd
    }
    

    그런 다음 당신의 유스 케이스에 당신은 단지 (이미 정렬되었다고 가정하고)

    import RDDConversions._
    
    yourRdd.grouped(2)
    

    면책 조항 : 테스트하지 않은, 좀 그냥 바로이 답변에 쓴

  3. from https://stackoverflow.com/questions/23127329/how-to-define-custom-partitioner-for-spark-rdds-of-equally-sized-partition-where by cc-by-sa and MIT license