복붙노트

[SCALA] 스파크 마지막 작업 방법을 개선하는 첫 번째 199 이상 100 배 시간 소요

SCALA

스파크 마지막 작업 방법을 개선하는 첫 번째 199 이상 100 배 시간 소요

dataframes를 사용하여 쿼리를 실행하는 동안 나는 몇 가지 성능 문제를보고하고있다. 나는 오랫동안 마지막으로 작업을 실행하면 데이터가 최적으로 방해받지 있다는 신호가 될 수 있다는 것을, 내 연구에 봐 왔지만,이 문제를 해결하기위한 상세한 과정을 발견하지 않았습니다.

나는 dataframes로 두 테이블을로드 오프 시작하고, 나는 다음 하나 개의 필드에 해당 테이블에 합류하고있다. 내가 (다시 분할)에 의해 배포 추가했는데, 정렬, 성능을 개선하기 위해,하지만 여전히이 하나의 긴 실행 마지막 작업을보고하고있다. 다음은 간단한 내 코드의 버전, 참고 쿼리 하나, 둘, 몇 가지 값을 계산이 간단하고 사용 UDF를 실제로하지 않은 것입니다.

나는 spark.sql.shuffle에 대한 몇 가지 다른 설정을 시도했습니다. 나는 100를 시도하지만 (정말 정직하게 많은이 디버깅하지 않았다) 실패했습니다. 나는 4000를 300했는데, 8000 성능은 각각의 증가와 감소. 나는 각 파일이 한 시간이다 데이터의 하루를 선택하고있다.

val df1 = sqlContext.sql("Select * from Table1")
val df2 = sqlContext.sql("Select * from Table2")

val distributeDf1 = df1
    .repartition(df1("userId"))
    .sortWithinPartitions(df1("userId"))

val distributeDf2 = df2
    .repartition(df2("userId"))
    .sortWithinPartitions(df2("userId"))

distributeDf1.registerTempTable("df1")
distributeDf2.registerTempTable("df2")

val df3 = sqlContext
  .sql("""
    Select 
      df1.* 
    from 
      df1 
    left outer join df2 on 
      df1.userId = df2.userId""")

이 userId를하여 파티션이 좋지 않은 것 때문에, 내가 대신 타임 스탬프로 분할 할 수있다. 내가 이렇게하면, 난 그냥 날짜 +의 시간을해야합니까? 내가 미만이 200 독특한 콤보가 있다면, 나는 빈 집행을해야합니다?

해결법

  1. ==============================

    1.당신은 분명히 거대한 올바른 데이터 스큐에 문제가 있습니다. 당신이 제공 한 통계 ㄱ 살펴 수 있습니다 :

    당신은 분명히 거대한 올바른 데이터 스큐에 문제가 있습니다. 당신이 제공 한 통계 ㄱ 살펴 수 있습니다 :

    df1 = [mean=4.989209978967438, stddev=2255.654165352454, count=2400088] 
    df2 = [mean=1.0, stddev=0.0, count=18408194]
    

    2000 이상 평균 약 5과 표준 편차를 사용하면 긴 꼬리를 얻을.

    일부 키가 훨씬 더 자주 이외의 집행을 다시 분할 후이기 때문에 남아있는 것보다 할 더 많은 일을해야합니다.

    당신의 설명 Furthermoreb 문제가 단일 또는 어떤 해시 동일한 파티션에 몇 키로 될 수 있음을 시사한다.

    자, 먼저 이상치 (의사)를 식별 할 수 있습니다 :

    val mean = 4.989209978967438 
    val sd = 2255.654165352454
    
    val df1 = sqlContext.sql("Select * from Table1")
    val counts = df.groupBy("userId").count.cache
    
    val frequent = counts
      .where($"count" > mean + 2 * sd)  // Adjust threshold based on actual dist.
      .alias("frequent")
      .join(df1, Seq("userId"))
    

    나머지:

    val infrequent = counts
      .where($"count" <= mean + 2 * sd)
      .alias("infrequent")
      .join(df1, Seq("userId"))
    

    그것은 예상 할 수있는 일이 정말? 그렇지 않은 경우, 문제의 상류의 소스를 식별하려고합니다.

    이 예상되는 경우, 당신은 시도 할 수 있습니다 :

    하지만 당신은 안 :

  2. from https://stackoverflow.com/questions/38517835/spark-final-task-takes-100x-times-longer-than-first-199-how-to-improve by cc-by-sa and MIT license