복붙노트

[SQL] (전치?)를 폭발 스파크 SQL 테이블에서 여러 열을

SQL

(전치?)를 폭발 스파크 SQL 테이블에서 여러 열을

나는 스파크 SQL을 사용하고 (나는 그것이 SQL 구문에 영향을 미치는 경우에는 스파크에 언급 - 나는 확실히 아직 할 수있는 익숙하지 충분 해요) 그리고 내가 다시 구조에 노력하고있는 테이블을 가지고 있지만 난 점점 동시에 여러 열을 바꾸어하려고 붙어.

기본적으로 나는 외모가 좋아하는 데이터가 :

userId    someString      varA     varB
   1      "example1"    [0,2,5]   [1,2,9]
   2      "example2"    [1,20,5]  [9,null,6]

나는 (길이가 항상 일치합니다)를 동시에 VARA 및 varB 모두를 폭발하고 싶습니다 -이 같은 그 최종 출력 외모 :

userId    someString      varA     varB
   1      "example1"       0         1
   1      "example1"       2         2
   1      "example1"       5         9
   2      "example2"       1         9
   2      "example2"       20       null
   2      "example2"       5         6

하지만 난 단 하나의 명령에 작업에 하나의 폭발 (VAR) 문을 얻을 것, 나는 체인 그들에게하려고하면 (즉, 첫 번째 폭발 명령 후에 임시 테이블을 만들 수) 나는 분명히 중복, 불필요한의 거대한 숫자를 얻을 수 있습니다 행.

많은 감사합니다!

해결법

  1. ==============================

    1.스파크> = 2.4

    스파크> = 2.4

    당신은 우편 UDF 및 사용 arrays_zip 기능을 건너 뛸 수 있습니다 :

    df.withColumn("vars", explode(arrays_zip($"varA", $"varB"))).select(
      $"userId", $"someString",
      $"vars.varA", $"vars.varB").show
    

    스파크 <2.4

    당신이 원하는 것은 사용자 정의 UDF없이 할 수 없습니다. 스칼라에서 당신이 뭔가를 할 수 있습니다 :

    val data = sc.parallelize(Seq(
        """{"userId": 1, "someString": "example1",
            "varA": [0, 2, 5], "varB": [1, 2, 9]}""",
        """{"userId": 2, "someString": "example2",
            "varA": [1, 20, 5], "varB": [9, null, 6]}"""
    ))
    
    val df = spark.read.json(data)
    
    df.printSchema
    // root
    //  |-- someString: string (nullable = true)
    //  |-- userId: long (nullable = true)
    //  |-- varA: array (nullable = true)
    //  |    |-- element: long (containsNull = true)
    //  |-- varB: array (nullable = true)
    //  |    |-- element: long (containsNull = true)
    

    이제 우리는 지퍼를 UDF 정의 할 수 있습니다 :

    import org.apache.spark.sql.functions.{udf, explode}
    
    val zip = udf((xs: Seq[Long], ys: Seq[Long]) => xs.zip(ys))
    
    df.withColumn("vars", explode(zip($"varA", $"varB"))).select(
       $"userId", $"someString",
       $"vars._1".alias("varA"), $"vars._2".alias("varB")).show
    
    // +------+----------+----+----+
    // |userId|someString|varA|varB|
    // +------+----------+----+----+
    // |     1|  example1|   0|   1|
    // |     1|  example1|   2|   2|
    // |     1|  example1|   5|   9|
    // |     2|  example2|   1|   9|
    // |     2|  example2|  20|null|
    // |     2|  example2|   5|   6|
    // +------+----------+----+----+
    

    원시 SQL로 :

    sqlContext.udf.register("zip", (xs: Seq[Long], ys: Seq[Long]) => xs.zip(ys))
    df.registerTempTable("df")
    
    sqlContext.sql(
      """SELECT userId, someString, explode(zip(varA, varB)) AS vars FROM df""")
    
  2. ==============================

    2.당신은 또한 시도 할 수

    당신은 또한 시도 할 수

    case class Input(
     userId: Integer,
     someString: String,
     varA: Array[Integer],
     varB: Array[Integer])
    
    case class Result(
     userId: Integer,
     someString: String,
     varA: Integer,
     varB: Integer)
    
    def getResult(row : Input) : Iterable[Result] = {
     val user_id = row.user_id
     val someString = row.someString
     val varA = row.varA
     val varB = row.varB
     val seq = for( i <- 0 until varA.size) yield {Result(user_id,someString,varA(i),varB(i))}
     seq
     }
    
    val obj1 = Input(1, "string1", Array(0, 2, 5), Array(1, 2, 9))
    val obj2 = Input(2, "string2", Array(1, 3, 6), Array(2, 3, 10))
    val input_df = sc.parallelize(Seq(obj1, obj2)).toDS
    
    val res = input_df.flatMap{ row => getResult(row) }
    res.show
    // +------+----------+----+-----+
    // |userId|someString|varA|varB |
    // +------+----------+----+-----+
    // |     1|  string1 |   0|   1 |
    // |     1|  string1 |   2|   2 |
    // |     1|  string1 |   5|   9 |
    // |     2|  string2 |   1|   2 |
    // |     2|  string2 |   3|   3 |
    // |     2|  string2 |   6|   10|
    // +------+----------+----+-----+
    
  3. from https://stackoverflow.com/questions/33220916/explode-transpose-multiple-columns-in-spark-sql-table by cc-by-sa and MIT license