[HADOOP] spark에서 구조체를 UDAF에 전달하십시오.
HADOOPspark에서 구조체를 UDAF에 전달하십시오.
나는 다음과 같은 스키마를 가지고있다.
root
|-- id:string (nullable = false)
|-- age: long (nullable = true)
|-- cars: struct (nullable = true)
| |-- car1: string (nullable = true)
| |-- car2: string (nullable = true)
| |-- car3: string (nullable = true)
|-- name: string (nullable = true)
나는 구조체 '자동차'를 우다프에 어떻게 전달할 수 있습니까? 만약 내가 단지 자동차 하위 구조체를 전달하려면 inputSchema가되어야합니다.
해결법
-
==============================
1.하지만 UDAF의 논리는 다를 수 있습니다. 예를 들어 두 개의 행이있는 경우 :
하지만 UDAF의 논리는 다를 수 있습니다. 예를 들어 두 개의 행이있는 경우 :
val seq = Seq(cars(cars_schema("car1", "car2", "car3")), (cars(cars_schema("car1", "car2", "car3")))) val rdd = spark.sparkContext.parallelize(seq)
여기 스키마는
root |-- cars: struct (nullable = true) | |-- car1: string (nullable = true) | |-- car2: string (nullable = true) | |-- car3: string (nullable = true)
다음 집계를 호출하려고하면 :
val df = seq.toDF df.agg(agg0(col("cars")))
다음과 같이 UDAF 입력 스키마를 변경해야합니다.
val carsSchema = StructType(List(StructField("car1", StringType, true), StructField("car2", StringType, true), StructField("car3", StringType, true)))
UDAF의 소년에게는이 스키마를 처리하여 inputSchema를 변경해야합니다.
override def inputSchema: StructType = StructType(StructField("input", carsSchema) :: Nil)
업데이트 방법에서 입력 행의 형식을 처리해야합니다. 행 :
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { val i = input.getAs[Array[Array[String]]](0) // i here would be [car1,car2,car3], an array of strings buffer(0) = ??? }
여기에서, 당신은 당신의 버퍼를 업데이트하고 병합을 완료하고 함수를 평가하도록 변환 할 수 있습니다.
from https://stackoverflow.com/questions/54518102/pass-a-struct-to-an-udaf-in-spark by cc-by-sa and MIT license
'HADOOP' 카테고리의 다른 글
[HADOOP] 오류 hive.HiveConfig : org.apache.hadoop.hive.conf.HiveConf를로드 할 수 없습니다. HIVE_CONF _DIR이 올바르게 설정되었는지 확인하십시오. (0) | 2019.08.01 |
---|---|
[HADOOP] 하이브의 LeaseExpiredException (0) | 2019.08.01 |
[HADOOP] Hadoop 클라이언트와 클러스터 분리 (0) | 2019.08.01 |
[HADOOP] Mongo-Hadoop 커넥터로 Apache Spark 설정 문제 (0) | 2019.08.01 |
[HADOOP] spark-shell 오류 : 스키마에 대한 FileSystem이 없습니다. wasb (0) | 2019.08.01 |