[PYTHON] Pyspark가있는 Spark 데이터 프레임의 각 열에있는 NaN 이외의 항목 수 계산
PYTHONPyspark가있는 Spark 데이터 프레임의 각 열에있는 NaN 이외의 항목 수 계산
하이브에로드 된 매우 큰 데이터 집합이 있습니다. 그것은 약 190 만 행과 1450 열로 구성됩니다. 각 열의 "적용 범위", 즉 각 열의 NaN이 아닌 값을 가진 행의 비율을 결정해야합니다.
여기 내 코드가 있습니다 :
from pyspark import SparkContext
from pyspark.sql import HiveContext
import string as string
sc = SparkContext(appName="compute_coverages") ## Create the context
sqlContext = HiveContext(sc)
df = sqlContext.sql("select * from data_table")
nrows_tot = df.count()
covgs=sc.parallelize(df.columns)
.map(lambda x: str(x))
.map(lambda x: (x, float(df.select(x).dropna().count()) / float(nrows_tot) * 100.))
이를 covgs.take (10)을 실행하면 pyspark 셸에서 처리하려고하면 다소 큰 오류 스택이 반환됩니다. /usr/lib64/python2.6/pickle.py 파일에 저장하는 데 문제가 있다고합니다. 이것은 오류의 마지막 부분입니다.
py4j.protocol.Py4JError: An error occurred while calling o37.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342)
at py4j.Gateway.invoke(Gateway.java:252)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)
노력하고있는 것보다 더 좋은 방법이 있다면 제안에 대해 열어 두겠습니다. 내가 팬더를 사용할 수는 없지만 현재 작업중인 클러스터에서 사용할 수 없기 때문에 판다를 설치할 권한이 없습니다.
해결법
-
==============================
1.더미 데이터로 시작합시다.
더미 데이터로 시작합시다.
from pyspark.sql import Row row = Row("v", "x", "y", "z") df = sc.parallelize([ row(0.0, 1, 2, 3.0), row(None, 3, 4, 5.0), row(None, None, 6, 7.0), row(float("Nan"), 8, 9, float("NaN")) ]).toDF() ## +----+----+---+---+ ## | v| x| y| z| ## +----+----+---+---+ ## | 0.0| 1| 2|3.0| ## |null| 3| 4|5.0| ## |null|null| 6|7.0| ## | NaN| 8| 9|NaN| ## +----+----+---+---+
필요한 것은 간단한 집계뿐입니다.
from pyspark.sql.functions import col, count, isnan, lit, sum def count_not_null(c, nan_as_null=False): """Use conversion between boolean and integer - False -> 0 - True -> 1 """ pred = col(c).isNotNull() & (~isnan(c) if nan_as_null else lit(True)) return sum(pred.cast("integer")).alias(c) df.agg(*[count_not_null(c) for c in df.columns]).show() ## +---+---+---+---+ ## | v| x| y| z| ## +---+---+---+---+ ## | 2| 3| 4| 4| ## +---+---+---+---+
또는 NaN을 NULL로 처리하려면 다음을 수행하십시오.
df.agg(*[count_not_null(c, True) for c in df.columns]).show() ## +---+---+---+---+ ## | v| x| y| z| ## +---+---+---+---+ ## | 1| 3| 4| 3| ## +---+---+---+---
SQL NULL 의미를 활용하여 사용자 정의 함수를 작성하지 않고 동일한 결과를 얻을 수도 있습니다.
df.agg(*[ count(c).alias(c) # vertical (column-wise) operations in SQL ignore NULLs for c in df.columns ]).show() ## +---+---+---+ ## | x| y| z| ## +---+---+---+ ## | 1| 2| 3| ## +---+---+---+
하지만 NaN에서는 작동하지 않습니다.
분수를 선호하는 경우 :
exprs = [(count_not_null(c) / count("*")).alias(c) for c in df.columns] df.agg(*exprs).show() ## +------------------+------------------+---+ ## | x| y| z| ## +------------------+------------------+---+ ## |0.3333333333333333|0.6666666666666666|1.0| ## +------------------+------------------+---+
또는
# COUNT(*) is equivalent to COUNT(1) so NULLs won't be an issue df.select(*[(count(c) / count("*")).alias(c) for c in df.columns]).show() ## +------------------+------------------+---+ ## | x| y| z| ## +------------------+------------------+---+ ## |0.3333333333333333|0.6666666666666666|1.0| ## +------------------+------------------+---+
from https://stackoverflow.com/questions/33900726/count-number-of-non-nan-entries-in-each-column-of-spark-dataframe-with-pyspark by cc-by-sa and MIT license
'PYTHON' 카테고리의 다른 글
[PYTHON] Mac OS X에서 Python 3 용 pip를 설치하는 방법은 무엇입니까? (0) | 2018.10.10 |
---|---|
[PYTHON] if-return-return 또는 if-else-return을 사용하는 것이 더 효율적입니까? (0) | 2018.10.10 |
[PYTHON] 판다 막대 막대에 막대 값에 주석 달기 (0) | 2018.10.10 |
[PYTHON] Celery (celerybeat)에 정기적 인 태스크를 동적으로 추가 / 제거하는 방법 (0) | 2018.10.10 |
[PYTHON] MySQL 데이터베이스에 데이터를 삽입하려면 어떻게해야합니까? (0) | 2018.10.10 |