[SCALA] 아파치 스파크 : mapPartitions 대이란?
SCALA아파치 스파크 : mapPartitions 대이란?
RDD의지도와 mapPartitions 방법의 차이점은 무엇입니까? 그리고지도 같은 또는 mapPartitions처럼 행동 flatMap합니까? 감사.
(편집하다) 즉, (어느 의미 또는 실행의 관점에서) 차이 무엇
def map[A, B](rdd: RDD[A], fn: (A => B))
(implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
rdd.mapPartitions({ iter: Iterator[A] => for (i <- iter) yield fn(i) },
preservesPartitioning = true)
}
과:
def map[A, B](rdd: RDD[A], fn: (A => B))
(implicit a: Manifest[A], b: Manifest[B]): RDD[B] = {
rdd.map(fn)
}
해결법
-
==============================
1.방법지도 기능을 적용하여 RDD 결과 단일 요소로 소스 RDD의 각 요소로 변환한다. mapPartitions는 결과 (가능성 없음)의 여러 요소에 소스 RDD 각 파티션을 변환한다.
방법지도 기능을 적용하여 RDD 결과 단일 요소로 소스 RDD의 각 요소로 변환한다. mapPartitions는 결과 (가능성 없음)의 여러 요소에 소스 RDD 각 파티션을 변환한다.
둘, flatMap은 (MAP)를 하나의 요소에 작동하지 않고 (mapPartitions로) 결과의 여러 요소를 생성한다.
-
==============================
2.
val newRd = myRdd.mapPartitions(partition => { val connection = new DbConnection /*creates a db connection per partition*/ val newPartition = partition.map(record => { readMatchingFromDB(record, connection) }).toList // consumes the iterator, thus calls readMatchingFromDB connection.close() // close dbconnection here newPartition.iterator // create a new iterator })
예. flatmap .. 실시 예 2의 자기 설명을 참조하시기 바랍니다.
예 시나리오 : 우리는 특정 RDD 파티션 100K 요소가 있다면 우리는 우리가지도를 사용할 때 매핑 변환 100K 시간에 의해 사용되는 기능을 해고 할 것이다.
우리가 mapPartitions를 사용하는 경우 반대로, 우리는 단지 특정 기능을 한 번 호출 할 것이다, 그러나 우리는 모두 100K 기록에 전달하고 다시 하나의 함수 호출에서 모든 응답을 얻을 것이다.
지도는 특정 기능에 너무 많은 시간을 작동하기 때문에 함수가 비싼 뭔가 우리가 (mappartitions의 경우) 한 번에 모든 요소에 전달 된 경우는 수행 할 필요가 없습니다 것마다하고있다, 특히, 성능 향상이있을 것입니다.
예 :
val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3) val b = a.map(_.length) val c = a.zip(b) c.collect res0: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8))
예 1
val a = sc.parallelize(1 to 9, 3) def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = { var res = List[(T, T)]() var pre = iter.next while (iter.hasNext) { val cur = iter.next; res .::= (pre, cur) pre = cur; } res.iterator } a.mapPartitions(myfunc).collect res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))
예 2
val x = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9,10), 3) def myfunc(iter: Iterator[Int]) : Iterator[Int] = { var res = List[Int]() while (iter.hasNext) { val cur = iter.next; res = res ::: List.fill(scala.util.Random.nextInt(10))(cur) } res.iterator } x.mapPartitions(myfunc).collect // some of the number are not outputted at all. This is because the random number generated for it is zero. res8: Array[Int] = Array(1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 5, 7, 7, 7, 9, 9, 10)
위의 프로그램은 다음과 같이 flatMap를 사용하여 작성 될 수있다.
실시 예 2 사용 flatmap
val x = sc.parallelize(1 to 10, 3) x.flatMap(List.fill(scala.util.Random.nextInt(10))(_)).collect res1: Array[Int] = Array(1, 2, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9, 9, 9, 9, 10, 10, 10, 10, 10, 10, 10, 10)
이 함수 번 / 파티션이 아닌 1 회 / 요소 호출 이후 mapPartitions 변환은지도보다 빠릅니다 ..
또한 읽기 : foreachPartition 대 foreach는 언제 무엇을 사용 하는가?
-
==============================
3.지도 :
지도 :
MapPartitions
from https://stackoverflow.com/questions/21185092/apache-spark-map-vs-mappartitions by cc-by-sa and MIT license
'SCALA' 카테고리의 다른 글
[SCALA] 어떻게 SBT 자식에서 종속 아티팩트를 당겨 수 있습니까? (0) | 2019.10.31 |
---|---|
[SCALA] 차이가 감소하고 foldLeft / 함수형 프로그래밍 (특히 스칼라 및 스칼라 API)를 폴드 간의? (0) | 2019.10.31 |
[SCALA] 어떻게 스칼라 루프의 탈출합니까? (0) | 2019.10.31 |
[SCALA] 스칼라에서 객체와 클래스의 차이 (0) | 2019.10.31 |
[SCALA] 형식 유추는 .toSet로 만든 설정에 실패? (0) | 2019.10.31 |