복붙노트

[SCALA] 어떻게 스칼라 스파크에서 RDD를 정렬하려면?

SCALA

어떻게 스칼라 스파크에서 RDD를 정렬하려면?

스파크 방법 sortByKey 읽기 :

sortByKey([ascending], [numTasks])   When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.

그것이 가능한 결과를 그냥 "N"금액을 반환하는 것입니다. 대신 모든 결과를 반환하는 그래서 그냥 최고 10 내가 배열 사용 걸릴 방법에 정렬 된 집합을 변환 할 수 있지만이 O (N) 작업이기 때문에보다 효율적인 방법이를 반환?

해결법

  1. ==============================

    1.대부분의 경우 당신은 이미 소스 코드를 정독했다 :

    대부분의 경우 당신은 이미 소스 코드를 정독했다 :

      class OrderedRDDFunctions {
       // <snip>
      def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[P] = {
        val part = new RangePartitioner(numPartitions, self, ascending)
        val shuffled = new ShuffledRDD[K, V, P](self, part)
        shuffled.mapPartitions(iter => {
          val buf = iter.toArray
          if (ascending) {
            buf.sortWith((x, y) => x._1 < y._1).iterator
          } else {
            buf.sortWith((x, y) => x._1 > y._1).iterator
          }
        }, preservesPartitioning = true)
      }
    

    그리고, 당신이 말한대로, 전체 데이터는 셔플 단계를 통과해야 함 - 코드와 같이.

    그러나 이후에 호출 테이크 (K)에 대한 우려 때문에 정확하지 않을 수 있습니다. 이 작업은 모든 N 항목을 통해주기를하지 않습니다

      /**
       * Take the first num elements of the RDD. It works by first scanning one partition, and use the
       * results from that partition to estimate the number of additional partitions needed to satisfy
       * the limit.
       */
      def take(num: Int): Array[T] = {
    

    다음 그래서, 그것은 보일 수있을 것입니다 :

  2. ==============================

    2.당신은 단지 10를 사용 rdd.top (10)가 필요합니다. 그것은 정렬 방지, 그래서 더 빠릅니다.

    당신은 단지 10를 사용 rdd.top (10)가 필요합니다. 그것은 정렬 방지, 그래서 더 빠릅니다.

    rdd.top 다음 힙을 병합, 힙의 각 파티션의 상위 N 수집 데이터를 통해 통과 한 평행한다. 그것은 O (rdd.count) 동작이다. 정렬하면 O (rdd.count 로그 rdd.count), 그리고 데이터 전송을 많이 발생할 것입니다 - 모든 데이터가 네트워크를 통해 전송 될 수 있도록이 셔플을한다.

  3. ==============================

    3.또 다른 옵션은, PySpark 1.2.0에서 적어도 takeOrdered의 사용이다.

    또 다른 옵션은, PySpark 1.2.0에서 적어도 takeOrdered의 사용이다.

    오름차순 :

    rdd.takeOrdered(10)
    

    내림차순 :

    rdd.takeOrdered(10, lambda x: -x)
    

    K, V 쌍에 대한 최고 K 값 :

    rdd.takeOrdered(10, lambda (k, v): -v)
    
  4. from https://stackoverflow.com/questions/23838614/how-to-sort-an-rdd-in-scala-spark by cc-by-sa and MIT license