[PYTHON] Pyspark : 행으로 여러 배열 열을 분할
PYTHONPyspark : 행으로 여러 배열 열을 분할
하나의 행과 여러 개의 열이있는 데이터 프레임이 있습니다. 일부 열은 단일 값이고 다른 열은 목록입니다. 모든 목록 열은 동일한 길이입니다. 비 목록 열을 그대로 유지하면서 각 목록 열을 별도의 행으로 분할하려고합니다.
샘플 DF :
df = sqlc.createDataFrame([Row(a=1, b=[1,2,3],c=[7,8,9], d='foo')])
# +---+---------+---------+---+
# | a| b| c| d|
# +---+---------+---------+---+
# | 1|[1, 2, 3]|[7, 8, 9]|foo|
# +---+---------+---------+---+
내가 원하는 것 :
+---+---+----+------+
| a| b| c | d |
+---+---+----+------+
| 1| 1| 7 | foo |
| 1| 2| 8 | foo |
| 1| 3| 9 | foo |
+---+---+----+------+
목록 열이 하나만있는 경우 폭발하는 것만으로도 간단합니다.
df_exploded = df.withColumn('b', explode('b'))
# >>> df_exploded.show()
# +---+---+---------+---+
# | a| b| c| d|
# +---+---+---------+---+
# | 1| 1|[7, 8, 9]|foo|
# | 1| 2|[7, 8, 9]|foo|
# | 1| 3|[7, 8, 9]|foo|
# +---+---+---------+---+
그러나 C 열을 폭발 시키려고해도 원하는 길이의 사각형이있는 데이터 프레임이 생깁니다.
df_exploded_again = df_exploded.withColumn('c', explode('c'))
# >>> df_exploded_again.show()
# +---+---+---+---+
# | a| b| c| d|
# +---+---+---+---+
# | 1| 1| 7|foo|
# | 1| 1| 8|foo|
# | 1| 1| 9|foo|
# | 1| 2| 7|foo|
# | 1| 2| 8|foo|
# | 1| 2| 9|foo|
# | 1| 3| 7|foo|
# | 1| 3| 8|foo|
# | 1| 3| 9|foo|
# +---+---+---+---+
내가 원하는 것은 - 각 열에 대해 해당 열의 배열의 n 번째 요소를 가져 와서 새 행에 추가합니다. 데이터 프레임의 모든 열에 걸쳐 폭발을 매핑하려고했지만 그 중 하나가 작동하지 않는 것 같습니다.
df_split = df.rdd.map(lambda col: df.withColumn(col, explode(col))).toDF()
해결법
-
==============================
1.스파크> = 2.4
스파크> = 2.4
zip_ udf를 arrays_zip 함수로 대체 할 수 있습니다.
from pyspark.sql.functions import arrays_zip (df .withColumn("tmp", arrays_zip("b", "c")) .withColumn("tmp", explode("tmp")) .select("a", col("tmp.b"), col("tmp.c"), "d"))
스파크 <2.4
DataFrames 및 UDF 사용 :
from pyspark.sql.types import ArrayType, StructType, StructField, IntegerType from pyspark.sql.functions import col, udf, explode zip_ = udf( lambda x, y: list(zip(x, y)), ArrayType(StructType([ # Adjust types to reflect data types StructField("first", IntegerType()), StructField("second", IntegerType()) ])) ) (df .withColumn("tmp", zip_("b", "c")) # UDF output cannot be directly passed to explode .withColumn("tmp", explode("tmp")) .select("a", col("tmp.first").alias("b"), col("tmp.second").alias("c"), "d"))
RDD 사용 :
(df .rdd .flatMap(lambda row: [(row.a, b, c, row.d) for b, c in zip(row.b, row.c)]) .toDF(["a", "b", "c", "d"]))
두 솔루션 모두 파이썬 통신 오버 헤드로 인해 비효율적입니다. 데이터 크기가 고정되어 있으면 다음과 같이 할 수 있습니다.
from functools import reduce from pyspark.sql import DataFrame # Length of array n = 3 # For legacy Python you'll need a separate function # in place of method accessor reduce( DataFrame.unionAll, (df.select("a", col("b").getItem(i), col("c").getItem(i), "d") for i in range(n)) ).toDF("a", "b", "c", "d")
또는:
from pyspark.sql.functions import array, struct # SQL level zip of arrays of known size # followed by explode tmp = explode(array(*[ struct(col("b").getItem(i).alias("b"), col("c").getItem(i).alias("c")) for i in range(n) ])) (df .withColumn("tmp", tmp) .select("a", col("tmp").getItem("b"), col("tmp").getItem("c"), "d"))
이것은 UDF 또는 RDD에 비해 상당히 빠릅니다. 임의의 수의 열을 지원하기 위해 일반화되었습니다.
# This uses keyword only arguments # If you use legacy Python you'll have to change signature # Body of the function can stay the same def zip_and_explode(*colnames, n): return explode(array(*[ struct(*[col(c).getItem(i).alias(c) for c in colnames]) for i in range(n) ])) df.withColumn("tmp", zip_and_explode("b", "c", n=3))
-
==============================
2.각 입력 행에서 여러 개의 출력 행을 만들려는 것처럼 맵핑하지 않고 flatMap을 사용해야합니다.
각 입력 행에서 여러 개의 출력 행을 만들려는 것처럼 맵핑하지 않고 flatMap을 사용해야합니다.
from pyspark.sql import Row def dualExplode(r): rowDict = r.asDict() bList = rowDict.pop('b') cList = rowDict.pop('c') for b,c in zip(bList, cList): newDict = dict(rowDict) newDict['b'] = b newDict['c'] = c yield Row(**newDict) df_split = sqlContext.createDataFrame(df.rdd.flatMap(dualExplode))
from https://stackoverflow.com/questions/41027315/pyspark-split-multiple-array-columns-into-rows by cc-by-sa and MIT license
'PYTHON' 카테고리의 다른 글
[PYTHON] ATLAS / MKL을 설치된 Numpy에 연결하십시오 (0) | 2018.10.08 |
---|---|
[PYTHON] ASCII가 아닌 문자를 제거하고 파이썬을 사용하여 마침표와 공백을 남기려면 어떻게합니까? (0) | 2018.10.07 |
[PYTHON] 다양한 깊이의 멀티 레벨 defaultdict? (0) | 2018.10.07 |
[PYTHON] Python pandas를 사용하여 날짜 및 시간 열 결합 (0) | 2018.10.07 |
[PYTHON] urllib.urlopen () 다음에 close ()를 호출해야합니까? (0) | 2018.10.07 |