복붙노트

[SCALA] 스파크 DataFrame에 하나의 열에서 여러 열을 유도

SCALA

스파크 DataFrame에 하나의 열에서 여러 열을 유도

나는 Dataframe에서 단일 문자열 컬럼과 같은 거대한 구문 분석 메타 데이터와 DF를, ColmnA로, DFA를 호출 할 수 있습니다.

나는 함수, ClassXYZ = FUNC1 (ColmnA)를 통해 여러 열로이 열, ColmnA을 중단하고 싶습니다. 이 기능은 여러 개의 변수, 클래스 ClassXYZ을 반환하고, 각 변수는 이제 이러한 ColmnA1, ColmnA2 등 새로운 컬럼에 매핑 할 수있다

어떻게 단 한 번이 FUNC1를 호출하여 이러한 추가 열이 다른 1 Dataframe에서 이러한 변환을 수행하고 반복-은 모든 열을 생성 할 필요가 없다.

그 나는이 거대한 기능에 새로운 열을 추가 할 때마다 호출한다면 해결하기 쉬운,하지만 나는하지 않도록 할 것.

친절 작업 또는 의사 코드로 알려 주시기 바랍니다.

감사

산 제이

해결법

  1. ==============================

    1.일반적으로 당신이 원하는 것은 바로 수 없습니다 말하기. UDF는 번에 하나의 열을 반환 할 수 있습니다. 이 한계를 극복 할 수있는 두 가지 방법이 있습니다 :

    일반적으로 당신이 원하는 것은 바로 수 없습니다 말하기. UDF는 번에 하나의 열을 반환 할 수 있습니다. 이 한계를 극복 할 수있는 두 가지 방법이 있습니다 :

  2. ==============================

    2.함수가 후, 요소의 순서가 될 아래의 예를 제공한다고 가정 :

    함수가 후, 요소의 순서가 될 아래의 예를 제공한다고 가정 :

    val df = sc.parallelize(List(("Mike,1986,Toronto", 30), ("Andre,1980,Ottawa", 36), ("jill,1989,London", 27))).toDF("infoComb", "age")
    df.show
    +------------------+---+
    |          infoComb|age|
    +------------------+---+
    |Mike,1986,Toronto| 30|
    | Andre,1980,Ottawa| 36|
    |  jill,1989,London| 27|
    +------------------+---+
    

    지금 당신이이 infoComb로 할 수있는 것은 당신이 문자열을 분할 시작하고 더 많은 열을 얻을 수 있다는 것입니다 :

    df.select(expr("(split(infoComb, ','))[0]").cast("string").as("name"), expr("(split(infoComb, ','))[1]").cast("integer").as("yearOfBorn"), expr("(split(infoComb, ','))[2]").cast("string").as("city"), $"age").show
    +-----+----------+-------+---+
    | name|yearOfBorn|   city|age|
    +-----+----------+-------+---+
    |Mike|      1986|Toronto| 30|
    |Andre|      1980| Ottawa| 36|
    | jill|      1989| London| 27|
    +-----+----------+-------+---+
    

    도움이 되었기를 바랍니다.

  3. ==============================

    3.당신의 결과 열이 원래와 같은 길이 일 경우에는 withColumn 기능과 UDF를 적용하여 새로운 열을 만들 수 있습니다. 이 후에는 예를 들어 원래 열을 삭제할 수 있습니다 :

    당신의 결과 열이 원래와 같은 길이 일 경우에는 withColumn 기능과 UDF를 적용하여 새로운 열을 만들 수 있습니다. 이 후에는 예를 들어 원래 열을 삭제할 수 있습니다 :

     val newDf = myDf.withColumn("newCol1", myFun(myDf("originalColumn")))
    .withColumn("newCol2", myFun2(myDf("originalColumn"))
    .drop(myDf("originalColumn"))
    

    어디 myFun는 다음과 같이 정의 된 UDF는 다음과 같습니다

       def myFun= udf(
        (originalColumnContent : String) =>  {
          // do something with your original column content and return a new one
        }
      )
    
  4. ==============================

    4.나는 하나의 열을 평평하게하는 기능을 만들 선택한 다음 바로 UDF와 동시에 호출.

    나는 하나의 열을 평평하게하는 기능을 만들 선택한 다음 바로 UDF와 동시에 호출.

    먼저이 정의 :

    implicit class DfOperations(df: DataFrame) {
    
      def flattenColumn(col: String) = {
        def addColumns(df: DataFrame, cols: Array[String]): DataFrame = {
          if (cols.isEmpty) df
          else addColumns(
            df.withColumn(col + "_" + cols.head, df(col + "." + cols.head)),
            cols.tail
          )
        }
    
        val field = df.select(col).schema.fields(0)
        val newCols = field.dataType.asInstanceOf[StructType].fields.map(x => x.name)
    
        addColumns(df, newCols).drop(col)
      }
    
      def withColumnMany(colName: String, col: Column) = {
        df.withColumn(colName, col).flattenColumn(colName)
      }
    
    }
    

    그리고 사용법은 매우 간단합니다 :

    case class MyClass(a: Int, b: Int)
    
    val df = sc.parallelize(Seq(
      (0),
      (1)
    )).toDF("x")
    
    val f = udf((x: Int) => MyClass(x*2,x*3))
    
    df.withColumnMany("test", f($"x")).show()
    
    //  +---+------+------+
    //  |  x|test_a|test_b|
    //  +---+------+------+
    //  |  0|     0|     0|
    //  |  1|     2|     3|
    //  +---+------+------+
    
  5. ==============================

    5.이것은 쉽게 피봇 함수를 사용하여 달성 될 수있다

    이것은 쉽게 피봇 함수를 사용하여 달성 될 수있다

    df4.groupBy("year").pivot("course").sum("earnings").collect() 
    
  6. from https://stackoverflow.com/questions/32196207/derive-multiple-columns-from-a-single-column-in-a-spark-dataframe by cc-by-sa and MIT license