복붙노트

[SCALA] 불꽃의 모든 열 / 행 전체를 처리 UDF

SCALA

불꽃의 모든 열 / 행 전체를 처리 UDF

문자열과 숫자 데이터 유형의 혼합을 포함하는 dataframe를 들어, 목표는 그들 모두의 minhash하는 새로운 기능 열을 만드는 것입니다.

이것은 dataframe.toRDD을 수행하여 수행 할 수 있지만 다음 단계는 단순히 dataframe에 RDD 백을 변환 할 때 그렇게 비싼 것입니다.

그래서 다음과 같은 라인을 따라 UDF를 할 수있는 방법이있다 :

val wholeRowUdf = udf( (row: Row) =>  computeHash(row))

행 물론 스파크 SQL 데이터 형식 아니다 - 같이이 작동하지 않을 수 있도록.

나는 그것을 실현 업데이트 / 명확하게 해 전체 행 UDF를 쉽게 만들 수 withColumn 내에서 실행하는. 무엇 분명하지 것은 스파크 SQL 문 내에서 사용할 수있는 것입니다 :

val featurizedDf = spark.sql("select wholeRowUdf( what goes here? ) as features 
                              from mytable")

해결법

  1. ==============================

    1.난 당신이 구조체의 붙박이 기능을 사용하여 UDF 함수에 모든 열 또는 선택된 열을 전달하는 행을 사용할 수 있음을 보여 하겠어

    난 당신이 구조체의 붙박이 기능을 사용하여 UDF 함수에 모든 열 또는 선택된 열을 전달하는 행을 사용할 수 있음을 보여 하겠어

    우선은 dataframe를 정의

    val df = Seq(
      ("a", "b", "c"),
      ("a1", "b1", "c1")
    ).toDF("col1", "col2", "col3")
    //    +----+----+----+
    //    |col1|col2|col3|
    //    +----+----+----+
    //    |a   |b   |c   |
    //    |a1  |b1  |c1  |
    //    +----+----+----+
    

    그 때 나는 하나의 문자열로 구분와 같은 기능, 행의 모든 ​​요소를 ​​만들기 위해 정의 (당신이 computeHash 기능을 가지고)

    import org.apache.spark.sql.Row
    def concatFunc(row: Row) = row.mkString(", ")
    

    그럼 난 UDF 함수에서 사용

    import org.apache.spark.sql.functions._
    def combineUdf = udf((row: Row) => concatFunc(row))
    

    마지막 I는 withColumn 함수를 사용하여 UDF 함수를 호출 한 열로 선택된 열을 조합 붙박이 기능 구조체 및 UDF 함수에 전달

    df.withColumn("contcatenated", combineUdf(struct(col("col1"), col("col2"), col("col3")))).show(false)
    //    +----+----+----+-------------+
    //    |col1|col2|col3|contcatenated|
    //    +----+----+----+-------------+
    //    |a   |b   |c   |a, b, c      |
    //    |a1  |b1  |c1  |a1, b1, c1   |
    //    +----+----+----+-------------+
    

    당신은 행 인수로 전체 열을 전달하는 데 사용할 수 있음을 볼 수 있도록

    당신은 한 번에 한 행의 모든 ​​열을 전달할 수 있습니다

    val columns = df.columns
    df.withColumn("contcatenated", combineUdf(struct(columns.map(col): _*)))
    

    업데이트

    당신은 당신이하는 것처럼 UDF 함수를 등록 할 필요도 SQL 쿼리와 같은 얻을 수 있습니다

    df.createOrReplaceTempView("tempview")
    sqlContext.udf.register("combineUdf", combineUdf)
    sqlContext.sql("select *, combineUdf(struct(`col1`, `col2`, `col3`)) as concatenated from tempview")
    

    그것은 당신에게 상기와 같은 결과를 줄 것이다

    당신이 열 이름을 하드 코딩하지 않으려는 이제 경우에 당신은 당신의 욕망에 따라 열 이름을 선택하고 문자열을 만들 수 있습니다

    val columns = df.columns.map(x => "`"+x+"`").mkString(",")
    sqlContext.sql(s"select *, combineUdf(struct(${columns})) as concatenated from tempview")
    

    나는 대답은 도움이 희망

  2. ==============================

    2.내가 해결 해낸 : 새 출력 열을 생성하는 기존의 스파크 SQL 함수에 열 이름을 드롭 :

    내가 해결 해낸 : 새 출력 열을 생성하는 기존의 스파크 SQL 함수에 열 이름을 드롭 :

    concat(${df.columns.tail.mkString(",'-',")}) as Features
    

    이 경우 dataframe에서 첫 번째 열은 대상 및 제외 하였다. 즉,이 접근법의 또 다른 장점 : 많은 컬럼의 실제 목록을 조작 할 수.

    이 방법은 RDD / dataframes의 불필요한 구조 조정을 피할 수 있습니다.

  3. from https://stackoverflow.com/questions/49434647/process-all-columns-the-entire-row-in-a-spark-udf by cc-by-sa and MIT license