복붙노트

[SCALA] 아파치 스파크 : mapPartitions 대이란?

SCALA

아파치 스파크 : mapPartitions 대이란?

RDD의지도와 mapPartitions 방법의 차이점은 무엇입니까? 그리고지도 같은 또는 mapPartitions처럼 행동 flatMap합니까? 감사.

(편집하다) 즉, (어느 의미 또는 실행의 관점에서) 차이 무엇

  def map[A, B](rdd: RDD[A], fn: (A => B))
               (implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
    rdd.mapPartitions({ iter: Iterator[A] => for (i <- iter) yield fn(i) },
      preservesPartitioning = true)
  }

과:

  def map[A, B](rdd: RDD[A], fn: (A => B))
               (implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
    rdd.map(fn)
  }

해결법

  1. ==============================

    1.방법지도 기능을 적용하여 RDD 결과 단일 요소로 소스 RDD의 각 요소로 변환한다. mapPartitions는 결과 (가능성 없음)의 여러 요소에 소스 RDD 각 파티션을 변환한다.

    방법지도 기능을 적용하여 RDD 결과 단일 요소로 소스 RDD의 각 요소로 변환한다. mapPartitions는 결과 (가능성 없음)의 여러 요소에 소스 RDD 각 파티션을 변환한다.

    둘, flatMap은 (MAP)를 하나의 요소에 작동하지 않고 (mapPartitions로) 결과의 여러 요소를 생성한다.

  2. ==============================

    2.

    val newRd = myRdd.mapPartitions(partition => {
      val connection = new DbConnection /*creates a db connection per partition*/
    
      val newPartition = partition.map(record => {
        readMatchingFromDB(record, connection)
      }).toList // consumes the iterator, thus calls readMatchingFromDB 
    
      connection.close() // close dbconnection here
      newPartition.iterator // create a new iterator
    })
    

    예. flatmap .. 실시 예 2의 자기 설명을 참조하시기 바랍니다.

    예 시나리오 : 우리는 특정 RDD 파티션 100K 요소가 있다면 우리는 우리가지도를 사용할 때 매핑 변환 100K 시간에 의해 사용되는 기능을 해고 할 것이다.

    우리가 mapPartitions를 사용하는 경우 반대로, 우리는 단지 특정 기능을 한 번 호출 할 것이다, 그러나 우리는 모두 100K 기록에 전달하고 다시 하나의 함수 호출에서 모든 응답을 얻을 것이다.

    지도는 특정 기능에 너무 많은 시간을 작동하기 때문에 함수가 비싼 뭔가 우리가 (mappartitions의 경우) 한 번에 모든 요소에 전달 된 경우는 수행 할 필요가 없습니다 것마다하고있다, 특히, 성능 향상이있을 것입니다.

    예 :

    val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
     val b = a.map(_.length)
     val c = a.zip(b)
     c.collect
     res0: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8)) 
    

    예 1

    val a = sc.parallelize(1 to 9, 3)
     def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {
       var res = List[(T, T)]()
       var pre = iter.next
       while (iter.hasNext)
       {
         val cur = iter.next;
         res .::= (pre, cur)
         pre = cur;
       }
       res.iterator
     }
     a.mapPartitions(myfunc).collect
     res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8)) 
    

    예 2

    val x = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9,10), 3)
     def myfunc(iter: Iterator[Int]) : Iterator[Int] = {
       var res = List[Int]()
       while (iter.hasNext) {
         val cur = iter.next;
         res = res ::: List.fill(scala.util.Random.nextInt(10))(cur)
       }
       res.iterator
     }
     x.mapPartitions(myfunc).collect
     // some of the number are not outputted at all. This is because the random number generated for it is zero.
     res8: Array[Int] = Array(1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 5, 7, 7, 7, 9, 9, 10) 
    

    위의 프로그램은 다음과 같이 flatMap를 사용하여 작성 될 수있다.

    실시 예 2 사용 flatmap

    val x  = sc.parallelize(1 to 10, 3)
     x.flatMap(List.fill(scala.util.Random.nextInt(10))(_)).collect
    
     res1: Array[Int] = Array(1, 2, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9, 9, 9, 9, 10, 10, 10, 10, 10, 10, 10, 10) 
    

    이 함수 번 / 파티션이 아닌 1 회 / 요소 호출 이후 mapPartitions 변환은지도보다 빠릅니다 ..

    또한 읽기 : foreachPartition 대 ​​foreach는 언제 무엇을 사용 하는가?

  3. ==============================

    3.지도 :

    지도 :

    MapPartitions

  4. from https://stackoverflow.com/questions/21185092/apache-spark-map-vs-mappartitions by cc-by-sa and MIT license