복붙노트

[SCALA] 평균으로 누락 된 값을 교체 - 불꽃 Dataframe을

SCALA

평균으로 누락 된 값을 교체 - 불꽃 Dataframe을

나는 일부 누락 된 값으로 스파크 Dataframe 있습니다. 그 컬럼에 대한 평균과 누락 값을 대체하여 간단한 전가을 실행하고 싶다. 나는이 논리를 구현하는 데 어려움을 겪고있다, 그래서 나는, 스파크에 아주 새로운 오전. 이것은 내가 지금까지 어떻게 관리해야 것입니다 :

A) (의이 골 발언권하자) 하나의 열이 작업을 수행하려면 코드 줄은 작동하는 것 같다 :

df.withColumn("new_Col", when($"ColA".isNull, df.select(mean("ColA"))
  .first()(0).asInstanceOf[Double])
  .otherwise($"ColA"))

나) 그러나, 나는 내 dataframe의 모든 열이 작업을 수행하는 방법을 알아낼 수 없었다. 나는지도 기능을 시도했지만, 나는 그것이 dataframe의 각 행을 통해 루프 믿는다

여기에 - C) SO에 비슷한 질문이 있습니다. 나는 (집계 테이블과 유착 사용) 솔루션을 좋아하면서, 나는 아주 (내가 그렇게 높은 순서 기능 등을 이용하여 각각의 칼럼을 통해 반복, R에서 온 각 열 통해 반복하여이 작업을 수행 할 수있는 방법이 있는지 알고 싶어했다 lapply은) 나에게 더 자연스러운 것 같다.

감사!

해결법

  1. ==============================

    1.스파크> = 2.2

    스파크> = 2.2

    당신은 (평균과 중앙값 전략을 모두 지원) org.apache.spark.ml.feature.Imputer를 사용할 수 있습니다.

    규모 :

    import org.apache.spark.ml.feature.Imputer
    
    val imputer = new Imputer()
      .setInputCols(df.columns)
      .setOutputCols(df.columns.map(c => s"${c}_imputed"))
      .setStrategy("mean")
    
    imputer.fit(df).transform(df)
    

    파이썬 :

    from pyspark.ml.feature import Imputer
    
    imputer = Imputer(
        inputCols=df.columns, 
        outputCols=["{}_imputed".format(c) for c in df.columns]
    )
    imputer.fit(df).transform(df)
    

    스파크 <2.2

    여기 있습니다 :

    import org.apache.spark.sql.functions.mean
    
    df.na.fill(df.columns.zip(
      df.select(df.columns.map(mean(_)): _*).first.toSeq
    ).toMap)
    

    어디

    df.columns.map(mean(_)): Array[Column] 
    

    각각의 컬럼에 대한 평균을 계산하고,

    df.select(_: *).first.toSeq: Seq[Any]
    

    서열에를 수집 집계 값과 변환 행을 [모든] (내가 아는 것이 최적이다 그러나 이것은 우리가 작업해야하는 API입니다)

    df.columns.zip(_).toMap: Map[String,Any] 
    

    지도 [문자열, 모든 마지막으로 평균, 그리고에 열 이름에서지도하는 : AMAP가 생성 :

    df.na.fill(_): DataFrame
    

    사용 누락 값을 채운다 :

    fill: Map[String, Any] => DataFrame 
    

    DataFrameNaFunctions에서.

    NaN의 항목을 무시하려면 대체 할 수있다 :

    df.select(df.columns.map(mean(_)): _*).first.toSeq
    

    와:

    import org.apache.spark.sql.functions.{col, isnan, when}
    
    
    df.select(df.columns.map(
      c => mean(when(!isnan(col(c)), col(c)))
    ): _*).first.toSeq
    
  2. ==============================

    2.PySpark, 이것은 내가 사용하는 코드는 다음과 같습니다

    PySpark, 이것은 내가 사용하는 코드는 다음과 같습니다

    mean_dict = { col: 'mean' for col in df.columns }
    col_avgs = df.agg( mean_dict ).collect()[0].asDict()
    col_avgs = { k[4:-1]: v for k,v in col_avgs.iteritems() }
    df.fillna( col_avgs ).show()
    

    네 가지 단계는 다음과 같습니다

  3. ==============================

    3.PySpark에서 (평균값 대신에) 중간을 전가는 <2.2

    PySpark에서 (평균값 대신에) 중간을 전가는 <2.2

    ## filter numeric cols
    num_cols = [col_type[0] for col_type in filter(lambda dtype: dtype[1] in {"bigint", "double", "int"}, df.dtypes)]
    ### Compute a dict with <col_name, median_value>
    median_dict = dict()
    for c in num_cols:
       median_dict[c] = df.stat.approxQuantile(c, [0.5], 0.001)[0]
    

    그런 다음, na.fill 적용

    df_imputed = df.na.fill(median_dict)
    
  4. from https://stackoverflow.com/questions/40057563/replace-missing-values-with-mean-spark-dataframe by cc-by-sa and MIT license