복붙노트

[PYTHON] Pyspark가있는 Spark 데이터 프레임의 각 열에있는 NaN 이외의 항목 수 계산

PYTHON

Pyspark가있는 Spark 데이터 프레임의 각 열에있는 NaN 이외의 항목 수 계산

하이브에로드 된 매우 큰 데이터 집합이 있습니다. 그것은 약 190 만 행과 1450 열로 구성됩니다. 각 열의 "적용 범위", 즉 각 열의 NaN이 아닌 값을 가진 행의 비율을 결정해야합니다.

여기 내 코드가 있습니다 :

from pyspark import SparkContext
from pyspark.sql import HiveContext
import string as string

sc = SparkContext(appName="compute_coverages") ## Create the context
sqlContext = HiveContext(sc)

df = sqlContext.sql("select * from data_table")
nrows_tot = df.count()

covgs=sc.parallelize(df.columns)
        .map(lambda x: str(x))
        .map(lambda x: (x, float(df.select(x).dropna().count()) / float(nrows_tot) * 100.))

이를 covgs.take (10)을 실행하면 pyspark 셸에서 처리하려고하면 다소 큰 오류 스택이 반환됩니다. /usr/lib64/python2.6/pickle.py 파일에 저장하는 데 문제가 있다고합니다. 이것은 오류의 마지막 부분입니다.

py4j.protocol.Py4JError: An error occurred while calling o37.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
        at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333)
        at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342)
        at py4j.Gateway.invoke(Gateway.java:252)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:207)
        at java.lang.Thread.run(Thread.java:745)

노력하고있는 것보다 더 좋은 방법이 있다면 제안에 대해 열어 두겠습니다. 내가 팬더를 사용할 수는 없지만 현재 작업중인 클러스터에서 사용할 수 없기 때문에 판다를 설치할 권한이 없습니다.

해결법

  1. ==============================

    1.더미 데이터로 시작합시다.

    더미 데이터로 시작합시다.

    from pyspark.sql import Row
    
    row = Row("v", "x", "y", "z")
    df = sc.parallelize([
        row(0.0, 1, 2, 3.0), row(None, 3, 4, 5.0),
        row(None, None, 6, 7.0), row(float("Nan"), 8, 9, float("NaN"))
    ]).toDF()
    
    ## +----+----+---+---+
    ## |   v|   x|  y|  z|
    ## +----+----+---+---+
    ## | 0.0|   1|  2|3.0|
    ## |null|   3|  4|5.0|
    ## |null|null|  6|7.0|
    ## | NaN|   8|  9|NaN|
    ## +----+----+---+---+
    

    필요한 것은 간단한 집계뿐입니다.

    from pyspark.sql.functions import col, count, isnan, lit, sum
    
    def count_not_null(c, nan_as_null=False):
        """Use conversion between boolean and integer
        - False -> 0
        - True ->  1
        """
        pred = col(c).isNotNull() & (~isnan(c) if nan_as_null else lit(True))
        return sum(pred.cast("integer")).alias(c)
    
    df.agg(*[count_not_null(c) for c in df.columns]).show()
    
    ## +---+---+---+---+
    ## |  v|  x|  y|  z|
    ## +---+---+---+---+
    ## |  2|  3|  4|  4|
    ## +---+---+---+---+
    

    또는 NaN을 NULL로 처리하려면 다음을 수행하십시오.

    df.agg(*[count_not_null(c, True) for c in df.columns]).show()
    
    ## +---+---+---+---+
    ## |  v|  x|  y|  z|
    ## +---+---+---+---+
    ## |  1|  3|  4|  3|
    ## +---+---+---+---
    

    SQL NULL 의미를 활용하여 사용자 정의 함수를 작성하지 않고 동일한 결과를 얻을 수도 있습니다.

    df.agg(*[
        count(c).alias(c)    # vertical (column-wise) operations in SQL ignore NULLs
        for c in df.columns
    ]).show()
    
    ## +---+---+---+
    ## |  x|  y|  z|
    ## +---+---+---+
    ## |  1|  2|  3|
    ## +---+---+---+
    

    하지만 NaN에서는 작동하지 않습니다.

    분수를 선호하는 경우 :

    exprs = [(count_not_null(c) / count("*")).alias(c) for c in df.columns]
    df.agg(*exprs).show()
    
    ## +------------------+------------------+---+
    ## |                 x|                 y|  z|
    ## +------------------+------------------+---+
    ## |0.3333333333333333|0.6666666666666666|1.0|
    ## +------------------+------------------+---+
    

    또는

    # COUNT(*) is equivalent to COUNT(1) so NULLs won't be an issue
    df.select(*[(count(c) / count("*")).alias(c) for c in df.columns]).show()
    
    ## +------------------+------------------+---+
    ## |                 x|                 y|  z|
    ## +------------------+------------------+---+
    ## |0.3333333333333333|0.6666666666666666|1.0|
    ## +------------------+------------------+---+
    
  2. from https://stackoverflow.com/questions/33900726/count-number-of-non-nan-entries-in-each-column-of-spark-dataframe-with-pyspark by cc-by-sa and MIT license