복붙노트

[SCALA] 병합은 전체 스테이지의 평행도를 감소 (스파크)

SCALA

병합은 전체 스테이지의 평행도를 감소 (스파크)

때로는 "최적화하는"비효율적 인 방식으로 dataframe 계획을 불꽃. 스파크 2.1 다음 예를 고려한다 (또한 스파크 1.6 재생할 수있다) :

val df = sparkContext.parallelize((1 to 500).map(i=> scala.util.Random.nextDouble),100).toDF("value")

val expensiveUDF = udf((d:Double) => {Thread.sleep(100);d})

val df_result = df
.withColumn("udfResult",expensiveUDF($"value"))

df_result
.coalesce(1)
.saveAsTable(tablename)

이 예에서 나는 (이 그냥 문제를 입증하는 예이다)를 dataframe의 비싼 전환 후 1 개 파일을 작성합니다. 스파크는 UDF에만 따라서 병렬 파괴 한 파티션을 포함하는 dataframe인가되도록 가입 병합 (1)를 이동 (흥미롭게 재분할 (1)는이 방식으로 작동하지 않는다).

일반화,이 동작은 내 변화의 특정 부분에 병렬 처리를 증가 할 때 발생하지만 감소 병렬 이후.

나는 dataframe를 캐시 한 후 dataframe의 전체 평가를 트리거로 구성되어 하나의 해결 방법을 발견했습니다 :

val df = sparkContext.parallelize((1 to 500).map(i=> scala.util.Random.nextDouble),100).toDF("value")

val expensiveUDF = udf((d:Double) => {Thread.sleep(100);d})

val df_result = df
.withColumn("udfResult",expensiveUDF($"value"))
.cache

df_result.rdd.count // trigger computation

df_result
.coalesce(1)
.saveAsTable(tablename)

내 질문은 : 이러한 경우에 감소 병렬로하지 스파크를 말할 수있는 또 다른 방법은 무엇입니까?

해결법

  1. ==============================

    1.사실 그것은 SparkSQL는 실행 계획에서 볼 수 있듯이, 병합 연산자의 위치를 ​​변경하지 않습니다 때문이 아니라 SparkSQL의 최적화이다 :

    사실 그것은 SparkSQL는 실행 계획에서 볼 수 있듯이, 병합 연산자의 위치를 ​​변경하지 않습니다 때문이 아니라 SparkSQL의 최적화이다 :

    Coalesce 1
    +- *Project [value#2, UDF(value#2) AS udfResult#11]
       +- *SerializeFromObject [input[0, double, false] AS value#2]
          +- Scan ExternalRDDScan[obj#1]
    

    나는 병합 API의 설명에서 단락을 인용 :

    참고 :이 단락은 JIRA의 SPARK-19399에 의해 추가됩니다. 그래서 2.0의 API에서 발견 할 수 없습니다.

    병합 API는 셔플을 수행하지만, 이전 RDD 현재 RDD 사이의 좁은 의존성의 결과를하지 않습니다. RDD 게으른 평가이기 때문에, 계산은 실제로 합체 파티션으로 이루어집니다.

    이를 방지하기 위해, 당신은 다시 파티션 API를 사용합니다.

  2. from https://stackoverflow.com/questions/44494656/coalesce-reduces-parallelism-of-entire-stage-spark by cc-by-sa and MIT license