[SCALA] 불꽃의 모든 열 / 행 전체를 처리 UDF
SCALA불꽃의 모든 열 / 행 전체를 처리 UDF
문자열과 숫자 데이터 유형의 혼합을 포함하는 dataframe를 들어, 목표는 그들 모두의 minhash하는 새로운 기능 열을 만드는 것입니다.
이것은 dataframe.toRDD을 수행하여 수행 할 수 있지만 다음 단계는 단순히 dataframe에 RDD 백을 변환 할 때 그렇게 비싼 것입니다.
그래서 다음과 같은 라인을 따라 UDF를 할 수있는 방법이있다 :
val wholeRowUdf = udf( (row: Row) => computeHash(row))
행 물론 스파크 SQL 데이터 형식 아니다 - 같이이 작동하지 않을 수 있도록.
나는 그것을 실현 업데이트 / 명확하게 해 전체 행 UDF를 쉽게 만들 수 withColumn 내에서 실행하는. 무엇 분명하지 것은 스파크 SQL 문 내에서 사용할 수있는 것입니다 :
val featurizedDf = spark.sql("select wholeRowUdf( what goes here? ) as features
from mytable")
해결법
-
==============================
1.난 당신이 구조체의 붙박이 기능을 사용하여 UDF 함수에 모든 열 또는 선택된 열을 전달하는 행을 사용할 수 있음을 보여 하겠어
난 당신이 구조체의 붙박이 기능을 사용하여 UDF 함수에 모든 열 또는 선택된 열을 전달하는 행을 사용할 수 있음을 보여 하겠어
우선은 dataframe를 정의
val df = Seq( ("a", "b", "c"), ("a1", "b1", "c1") ).toDF("col1", "col2", "col3") // +----+----+----+ // |col1|col2|col3| // +----+----+----+ // |a |b |c | // |a1 |b1 |c1 | // +----+----+----+
그 때 나는 하나의 문자열로 구분와 같은 기능, 행의 모든 요소를 만들기 위해 정의 (당신이 computeHash 기능을 가지고)
import org.apache.spark.sql.Row def concatFunc(row: Row) = row.mkString(", ")
그럼 난 UDF 함수에서 사용
import org.apache.spark.sql.functions._ def combineUdf = udf((row: Row) => concatFunc(row))
마지막 I는 withColumn 함수를 사용하여 UDF 함수를 호출 한 열로 선택된 열을 조합 붙박이 기능 구조체 및 UDF 함수에 전달
df.withColumn("contcatenated", combineUdf(struct(col("col1"), col("col2"), col("col3")))).show(false) // +----+----+----+-------------+ // |col1|col2|col3|contcatenated| // +----+----+----+-------------+ // |a |b |c |a, b, c | // |a1 |b1 |c1 |a1, b1, c1 | // +----+----+----+-------------+
당신은 행 인수로 전체 열을 전달하는 데 사용할 수 있음을 볼 수 있도록
당신은 한 번에 한 행의 모든 열을 전달할 수 있습니다
val columns = df.columns df.withColumn("contcatenated", combineUdf(struct(columns.map(col): _*)))
업데이트
당신은 당신이하는 것처럼 UDF 함수를 등록 할 필요도 SQL 쿼리와 같은 얻을 수 있습니다
df.createOrReplaceTempView("tempview") sqlContext.udf.register("combineUdf", combineUdf) sqlContext.sql("select *, combineUdf(struct(`col1`, `col2`, `col3`)) as concatenated from tempview")
그것은 당신에게 상기와 같은 결과를 줄 것이다
당신이 열 이름을 하드 코딩하지 않으려는 이제 경우에 당신은 당신의 욕망에 따라 열 이름을 선택하고 문자열을 만들 수 있습니다
val columns = df.columns.map(x => "`"+x+"`").mkString(",") sqlContext.sql(s"select *, combineUdf(struct(${columns})) as concatenated from tempview")
나는 대답은 도움이 희망
-
==============================
2.내가 해결 해낸 : 새 출력 열을 생성하는 기존의 스파크 SQL 함수에 열 이름을 드롭 :
내가 해결 해낸 : 새 출력 열을 생성하는 기존의 스파크 SQL 함수에 열 이름을 드롭 :
concat(${df.columns.tail.mkString(",'-',")}) as Features
이 경우 dataframe에서 첫 번째 열은 대상 및 제외 하였다. 즉,이 접근법의 또 다른 장점 : 많은 컬럼의 실제 목록을 조작 할 수.
이 방법은 RDD / dataframes의 불필요한 구조 조정을 피할 수 있습니다.
from https://stackoverflow.com/questions/49434647/process-all-columns-the-entire-row-in-a-spark-udf by cc-by-sa and MIT license
'SCALA' 카테고리의 다른 글
[SCALA] 스파크 스칼라 앱에 대한 동일한 dataframe에서 날짜 컬럼에 일 칼럼의 번호 추가 (0) | 2019.11.18 |
---|---|
[SCALA] 자바에서 scala.None 액세스 (0) | 2019.11.18 |
[SCALA] 어떻게 CSV 파일에서 스키마를 만들고 파일에 해당 스키마 저장 / 유지하기 위해? (0) | 2019.11.18 |
[SCALA] 스파크 열 문자열 다른 열 (행)에 존재하는 경우 대체 (0) | 2019.11.18 |
[SCALA] 스칼라 단일 메소드 인터페이스 구현 (0) | 2019.11.17 |