[SCALA] 평균으로 누락 된 값을 교체 - 불꽃 Dataframe을
SCALA평균으로 누락 된 값을 교체 - 불꽃 Dataframe을
나는 일부 누락 된 값으로 스파크 Dataframe 있습니다. 그 컬럼에 대한 평균과 누락 값을 대체하여 간단한 전가을 실행하고 싶다. 나는이 논리를 구현하는 데 어려움을 겪고있다, 그래서 나는, 스파크에 아주 새로운 오전. 이것은 내가 지금까지 어떻게 관리해야 것입니다 :
A) (의이 골 발언권하자) 하나의 열이 작업을 수행하려면 코드 줄은 작동하는 것 같다 :
df.withColumn("new_Col", when($"ColA".isNull, df.select(mean("ColA"))
.first()(0).asInstanceOf[Double])
.otherwise($"ColA"))
나) 그러나, 나는 내 dataframe의 모든 열이 작업을 수행하는 방법을 알아낼 수 없었다. 나는지도 기능을 시도했지만, 나는 그것이 dataframe의 각 행을 통해 루프 믿는다
여기에 - C) SO에 비슷한 질문이 있습니다. 나는 (집계 테이블과 유착 사용) 솔루션을 좋아하면서, 나는 아주 (내가 그렇게 높은 순서 기능 등을 이용하여 각각의 칼럼을 통해 반복, R에서 온 각 열 통해 반복하여이 작업을 수행 할 수있는 방법이 있는지 알고 싶어했다 lapply은) 나에게 더 자연스러운 것 같다.
감사!
해결법
-
==============================
1.스파크> = 2.2
스파크> = 2.2
당신은 (평균과 중앙값 전략을 모두 지원) org.apache.spark.ml.feature.Imputer를 사용할 수 있습니다.
규모 :
import org.apache.spark.ml.feature.Imputer val imputer = new Imputer() .setInputCols(df.columns) .setOutputCols(df.columns.map(c => s"${c}_imputed")) .setStrategy("mean") imputer.fit(df).transform(df)
파이썬 :
from pyspark.ml.feature import Imputer imputer = Imputer( inputCols=df.columns, outputCols=["{}_imputed".format(c) for c in df.columns] ) imputer.fit(df).transform(df)
스파크 <2.2
여기 있습니다 :
import org.apache.spark.sql.functions.mean df.na.fill(df.columns.zip( df.select(df.columns.map(mean(_)): _*).first.toSeq ).toMap)
어디
df.columns.map(mean(_)): Array[Column]
각각의 컬럼에 대한 평균을 계산하고,
df.select(_: *).first.toSeq: Seq[Any]
서열에를 수집 집계 값과 변환 행을 [모든] (내가 아는 것이 최적이다 그러나 이것은 우리가 작업해야하는 API입니다)
df.columns.zip(_).toMap: Map[String,Any]
지도 [문자열, 모든 마지막으로 평균, 그리고에 열 이름에서지도하는 : AMAP가 생성 :
df.na.fill(_): DataFrame
사용 누락 값을 채운다 :
fill: Map[String, Any] => DataFrame
DataFrameNaFunctions에서.
NaN의 항목을 무시하려면 대체 할 수있다 :
df.select(df.columns.map(mean(_)): _*).first.toSeq
와:
import org.apache.spark.sql.functions.{col, isnan, when} df.select(df.columns.map( c => mean(when(!isnan(col(c)), col(c))) ): _*).first.toSeq
-
==============================
2.PySpark, 이것은 내가 사용하는 코드는 다음과 같습니다
PySpark, 이것은 내가 사용하는 코드는 다음과 같습니다
mean_dict = { col: 'mean' for col in df.columns } col_avgs = df.agg( mean_dict ).collect()[0].asDict() col_avgs = { k[4:-1]: v for k,v in col_avgs.iteritems() } df.fillna( col_avgs ).show()
네 가지 단계는 다음과 같습니다
-
==============================
3.PySpark에서 (평균값 대신에) 중간을 전가는 <2.2
PySpark에서 (평균값 대신에) 중간을 전가는 <2.2
## filter numeric cols num_cols = [col_type[0] for col_type in filter(lambda dtype: dtype[1] in {"bigint", "double", "int"}, df.dtypes)] ### Compute a dict with <col_name, median_value> median_dict = dict() for c in num_cols: median_dict[c] = df.stat.approxQuantile(c, [0.5], 0.001)[0]
그런 다음, na.fill 적용
df_imputed = df.na.fill(median_dict)
from https://stackoverflow.com/questions/40057563/replace-missing-values-with-mean-spark-dataframe by cc-by-sa and MIT license
'SCALA' 카테고리의 다른 글
[SCALA] 어떻게 스파크 dataframe에 목록 / 배열에서 새 열을 추가하는 아파치 불꽃 (0) | 2019.11.10 |
---|---|
[SCALA] 위 유형 경계에서 작동하지 않는 추상 형식을 무시 케이크 패턴 (0) | 2019.11.10 |
[SCALA] 어떻게 선물 꼬리 재귀를 포함하는 함수를 어떻게해야합니까? (0) | 2019.11.10 |
[SCALA] 업로드 된 파일의 경로를 얻는 방법 (0) | 2019.11.10 |
[SCALA] 왜 SparkContext.textFile의 파티션 매개 변수는 적용되지 않습니다? (0) | 2019.11.09 |