[SQL] (전치?)를 폭발 스파크 SQL 테이블에서 여러 열을
SQL(전치?)를 폭발 스파크 SQL 테이블에서 여러 열을
나는 스파크 SQL을 사용하고 (나는 그것이 SQL 구문에 영향을 미치는 경우에는 스파크에 언급 - 나는 확실히 아직 할 수있는 익숙하지 충분 해요) 그리고 내가 다시 구조에 노력하고있는 테이블을 가지고 있지만 난 점점 동시에 여러 열을 바꾸어하려고 붙어.
기본적으로 나는 외모가 좋아하는 데이터가 :
userId someString varA varB
1 "example1" [0,2,5] [1,2,9]
2 "example2" [1,20,5] [9,null,6]
나는 (길이가 항상 일치합니다)를 동시에 VARA 및 varB 모두를 폭발하고 싶습니다 -이 같은 그 최종 출력 외모 :
userId someString varA varB
1 "example1" 0 1
1 "example1" 2 2
1 "example1" 5 9
2 "example2" 1 9
2 "example2" 20 null
2 "example2" 5 6
하지만 난 단 하나의 명령에 작업에 하나의 폭발 (VAR) 문을 얻을 것, 나는 체인 그들에게하려고하면 (즉, 첫 번째 폭발 명령 후에 임시 테이블을 만들 수) 나는 분명히 중복, 불필요한의 거대한 숫자를 얻을 수 있습니다 행.
많은 감사합니다!
해결법
-
==============================
1.스파크> = 2.4
스파크> = 2.4
당신은 우편 UDF 및 사용 arrays_zip 기능을 건너 뛸 수 있습니다 :
df.withColumn("vars", explode(arrays_zip($"varA", $"varB"))).select( $"userId", $"someString", $"vars.varA", $"vars.varB").show
스파크 <2.4
당신이 원하는 것은 사용자 정의 UDF없이 할 수 없습니다. 스칼라에서 당신이 뭔가를 할 수 있습니다 :
val data = sc.parallelize(Seq( """{"userId": 1, "someString": "example1", "varA": [0, 2, 5], "varB": [1, 2, 9]}""", """{"userId": 2, "someString": "example2", "varA": [1, 20, 5], "varB": [9, null, 6]}""" )) val df = spark.read.json(data) df.printSchema // root // |-- someString: string (nullable = true) // |-- userId: long (nullable = true) // |-- varA: array (nullable = true) // | |-- element: long (containsNull = true) // |-- varB: array (nullable = true) // | |-- element: long (containsNull = true)
이제 우리는 지퍼를 UDF 정의 할 수 있습니다 :
import org.apache.spark.sql.functions.{udf, explode} val zip = udf((xs: Seq[Long], ys: Seq[Long]) => xs.zip(ys)) df.withColumn("vars", explode(zip($"varA", $"varB"))).select( $"userId", $"someString", $"vars._1".alias("varA"), $"vars._2".alias("varB")).show // +------+----------+----+----+ // |userId|someString|varA|varB| // +------+----------+----+----+ // | 1| example1| 0| 1| // | 1| example1| 2| 2| // | 1| example1| 5| 9| // | 2| example2| 1| 9| // | 2| example2| 20|null| // | 2| example2| 5| 6| // +------+----------+----+----+
원시 SQL로 :
sqlContext.udf.register("zip", (xs: Seq[Long], ys: Seq[Long]) => xs.zip(ys)) df.registerTempTable("df") sqlContext.sql( """SELECT userId, someString, explode(zip(varA, varB)) AS vars FROM df""")
-
==============================
2.당신은 또한 시도 할 수
당신은 또한 시도 할 수
case class Input( userId: Integer, someString: String, varA: Array[Integer], varB: Array[Integer]) case class Result( userId: Integer, someString: String, varA: Integer, varB: Integer) def getResult(row : Input) : Iterable[Result] = { val user_id = row.user_id val someString = row.someString val varA = row.varA val varB = row.varB val seq = for( i <- 0 until varA.size) yield {Result(user_id,someString,varA(i),varB(i))} seq } val obj1 = Input(1, "string1", Array(0, 2, 5), Array(1, 2, 9)) val obj2 = Input(2, "string2", Array(1, 3, 6), Array(2, 3, 10)) val input_df = sc.parallelize(Seq(obj1, obj2)).toDS val res = input_df.flatMap{ row => getResult(row) } res.show // +------+----------+----+-----+ // |userId|someString|varA|varB | // +------+----------+----+-----+ // | 1| string1 | 0| 1 | // | 1| string1 | 2| 2 | // | 1| string1 | 5| 9 | // | 2| string2 | 1| 2 | // | 2| string2 | 3| 3 | // | 2| string2 | 6| 10| // +------+----------+----+-----+
from https://stackoverflow.com/questions/33220916/explode-transpose-multiple-columns-in-spark-sql-table by cc-by-sa and MIT license
'SQL' 카테고리의 다른 글
[SQL] 매개 변수화 된 쿼리가 공급되지 않은 매개 변수를 기대 (0) | 2020.04.20 |
---|---|
[SQL] 어떻게 변환 쉼표로 2005 SQL 서버에서 테이블 레코드에 NVARCHAR을 분리? (0) | 2020.04.20 |
[SQL] SQL : 변수에 따라 선택 동적 열 이름 (0) | 2020.04.20 |
[SQL] 어떻게 스파크 RDD에 대한 SQL ROW_NUMBER 동등한를받을 수 있나요? (0) | 2020.04.20 |
[SQL] "최고의 경기"에 의해 MySQL의 순서 (0) | 2020.04.20 |