복붙노트

[SCALA] 스파크 DataFrame 그룹화 데이터의 표준 편차를 계산

SCALA

스파크 DataFrame 그룹화 데이터의 표준 편차를 계산

나는 내가 CSV에서 촬영하고 SparkSQL 쿼리 기능을 활용하기 위해 DataFrame로 변환 한 것을 사용자가 로그인 할 수 있습니다. 단일 사용자는 시간당 많은 항목을 만들 것입니다, 나는 각 사용자에 대한 기본적인 통계 정보를 수집하고 싶습니다; 정말 사용자 인스턴스의 수, 평균, 그리고 수많은 컬럼의 표준 편차. 나는 빨리 평균을 얻고 GROUPBY를 사용하여 정보를 계산 할 수 ($ "사용자")와 수 및 평균에 대한 SparkSQL 기능과 함께 집계했다 :

val meanData = selectedData.groupBy($"user").agg(count($"logOn"),
avg($"transaction"), avg($"submit"), avg($"submitsPerHour"), avg($"replies"),
avg($"repliesPerHour"), avg($"duration"))

그러나, 나는 표준 편차를 계산하기 위해 동등하게 우아한 방법을 찾을 수 없습니다. . 지금까지 난 단지 문자열, 이중 쌍 사용 StatCounter () 표준 편차 유틸리티를 매핑하여 계산할 수 있습니다 :

val stdevduration = duration.groupByKey().mapValues(value =>
org.apache.spark.util.StatCounter(value).stdev)

이 수익률은 RDD 그러나, 나는 더 쿼리가 반환 된 데이터에 가능하도록 시도하고 DataFrame에 모든 것을 유지하고 싶습니다.

해결법

  1. ==============================

    1.스파크 1.6

    스파크 1.6

    당신은 편견 샘플 표준 편차를 계산하는 계산 모집단 표준 편차 STDDEV / stddev_samp에 STDDEV_POP을 사용할 수 있습니다 :

    import org.apache.spark.sql.functions.{stddev_samp, stddev_pop}
    
    selectedData.groupBy($"user").agg(stdev_pop($"duration"))
    

    1.5 이하 (원래의 대답) 스파크 :

    안 꽤하고 있지만 수식을 사용합니다 (기술에서 리턴 값과 동일)으로 바이어스 :

    이 같은 작업을 수행 할 수 있습니다 :

    import org.apache.spark.sql.functions.sqrt
    
    selectedData
        .groupBy($"user")
        .agg((sqrt(
            avg($"duration" * $"duration") -
            avg($"duration") * avg($"duration")
         )).alias("duration_sd"))
    

    당신은 물론 혼란을 줄이기 위해 함수를 만들 수 있습니다 :

    import org.apache.spark.sql.Column
    def mySd(col: Column): Column = {
        sqrt(avg(col * col) - avg(col) * avg(col))
    }
    
    df.groupBy($"user").agg(mySd($"duration").alias("duration_sd"))
    

    하이브 UDF를 사용하는 것도 가능합니다 :

    df.registerTempTable("df")
    sqlContext.sql("""SELECT user, stddev(duration)
                      FROM df
                      GROUP BY user""")
    

    이미지 출처 : https://en.wikipedia.org/wiki/Standard_deviation

  2. ==============================

    2.(MRez에 의해 지적)는 오타가 같이 허용 코드는 컴파일되지 않습니다. 작품과 아래의 조각은 테스트됩니다.

    (MRez에 의해 지적)는 오타가 같이 허용 코드는 컴파일되지 않습니다. 작품과 아래의 조각은 테스트됩니다.

    2.0 스파크 :

    import org.apache.spark.sql.functions._
    val _avg_std = df.groupBy("user").agg(
            avg(col("duration").alias("avg")),
            stddev(col("duration").alias("stdev")),
            stddev_pop(col("duration").alias("stdev_pop")),
            stddev_samp(col("duration").alias("stdev_samp"))
            )
    
  3. from https://stackoverflow.com/questions/31789939/calculate-the-standard-deviation-of-grouped-data-in-a-spark-dataframe by cc-by-sa and MIT license