복붙노트

[PYTHON] PySpark 데이터 프레임에 새로운 열로 열 합계를 추가하십시오.

PYTHON

PySpark 데이터 프레임에 새로운 열로 열 합계를 추가하십시오.

나는 PySpark를 사용하고 있으며 일련의 숫자 열과 함께 Spark 데이터 프레임을 가지고 있습니다. 다른 모든 열의 합계 인 열을 추가하고 싶습니다.

내 데이터 프레임에 "a", "b"및 "c"열이 있다고 가정합니다. 나는 내가 이것을 할 수 있다는 것을 안다.

df.withColumn('total_col', df.a + df.b + df.c)

문제는 각 열을 개별적으로 입력하고 추가하는 것을 원하지 않는다는 것입니다. 특히 많은 열이있는 경우에는 특히 그렇습니다. 이 작업은 자동으로 수행하거나 추가 할 열 이름 목록을 지정하여 수행 할 수 있습니다. 이것을 할 또 다른 방법이 있습니까?

해결법

  1. ==============================

    1.이것은 분명하지 않았습니다. spark Dataframes API에 정의 된 열의 행 기반 합계를 볼 수 없습니다.

    이것은 분명하지 않았습니다. spark Dataframes API에 정의 된 열의 행 기반 합계를 볼 수 없습니다.

    이는 매우 간단한 방법으로 수행 할 수 있습니다.

    newdf = df.withColumn('total', sum(df[col] for col in df.columns))
    

    df.columns는 pyspark에 의해 Spark Dataframe의 모든 열 이름을 제공하는 문자열 목록으로 제공됩니다. 다른 합계의 경우 다른 열 이름 목록을 대신 제공 할 수 있습니다.

    나는 그것이 어떻게 행동 할지를 확신하지 못했기 때문에 이것을 첫 번째 해결책으로 시도하지 않았습니다. 그러나 그것은 효과적이다.

    이것은 지나치게 복잡하지만 잘 작동합니다.

    당신은 이것을 할 수 있습니다 :

    파이썬의 축소와 함께, 연산자 오버로딩이 어떻게 작동하는지에 대한 지식과 컬럼에 대한 pyspark 코드는 다음과 같습니다.

    def column_add(a,b):
         return  a.__add__(b)
    
    newdf = df.withColumn('total_col', 
             reduce(column_add, ( df[col] for col in df.columns ) ))
    

    이것은 파이썬 감소, 스파크 RDD 감소가 아니라는 것을 유의하십시오. 줄이기위한 두 번째 매개 변수의 괄호 항목은 목록 생성기 표현식이므로 괄호가 필요합니다.

    테스트를 거쳤습니다!

    $ pyspark
    >>> df = sc.parallelize([{'a': 1, 'b':2, 'c':3}, {'a':8, 'b':5, 'c':6}, {'a':3, 'b':1, 'c':0}]).toDF().cache()
    >>> df
    DataFrame[a: bigint, b: bigint, c: bigint]
    >>> df.columns
    ['a', 'b', 'c']
    >>> def column_add(a,b):
    ...     return a.__add__(b)
    ...
    >>> df.withColumn('total', reduce(column_add, ( df[col] for col in df.columns ) )).collect()
    [Row(a=1, b=2, c=3, total=6), Row(a=8, b=5, c=6, total=19), Row(a=3, b=1, c=0, total=4)]
    
  2. ==============================

    2.내 문제는 PySpark 데이터 프레임에 연속적인 열 합계를 새로운 열로 추가해야하기 때문에 위와 비슷했다 (좀 더 복잡하다). 이 접근법은 Paul의 버전 1의 코드를 사용합니다.

    내 문제는 PySpark 데이터 프레임에 연속적인 열 합계를 새로운 열로 추가해야하기 때문에 위와 비슷했다 (좀 더 복잡하다). 이 접근법은 Paul의 버전 1의 코드를 사용합니다.

    import pyspark
    from pyspark.sql import SparkSession
    import pandas as pd
    
    spark = SparkSession.builder.appName('addColAsCumulativeSUM').getOrCreate()
    df=spark.createDataFrame(data=[(1,2,3),(4,5,6),(3,2,1)\
                                  ,(6,1,-4),(0,2,-2),(6,4,1)\
                                  ,(4,5,2),(5,-3,-5),(6,4,-1)]\
                                  ,schema=['x1','x2','x3'])
    df.show()
    
    +---+---+---+
    | x1| x2| x3|
    +---+---+---+
    |  1|  2|  3|
    |  4|  5|  6|
    |  3|  2|  1|
    |  6|  1| -4|
    |  0|  2| -2|
    |  6|  4|  1|
    |  4|  5|  2|
    |  5| -3| -5|
    |  6|  4| -1|
    +---+---+---+
    
    colnames=df.columns
    

    누적 합계 (연속) 인 새 열 추가 :

    for i in range(0,len(colnames)):
        colnameLst= colnames[0:i+1]
        colname = 'cm'+ str(i+1)
        df = df.withColumn(colname, sum(df[col] for col in colnameLst))
    

    df.show()

    +---+---+---+---+---+---+
    | x1| x2| x3|cm1|cm2|cm3|
    +---+---+---+---+---+---+
    |  1|  2|  3|  1|  3|  6|
    |  4|  5|  6|  4|  9| 15|
    |  3|  2|  1|  3|  5|  6|
    |  6|  1| -4|  6|  7|  3|
    |  0|  2| -2|  0|  2|  0|
    |  6|  4|  1|  6| 10| 11|
    |  4|  5|  2|  4|  9| 11|
    |  5| -3| -5|  5|  2| -3|
    |  6|  4| -1|  6| 10|  9|
    +---+---+---+---+---+---+
    

    추가 된 누적 합계 열은 다음과 같습니다.

    cm1 = x1
    cm2 = x1 + x2
    cm3 = x1 + x2 + x3
    
  3. from https://stackoverflow.com/questions/31955309/add-column-sum-as-new-column-in-pyspark-dataframe by cc-by-sa and MIT license