복붙노트

[HADOOP] spark에서 구조체를 UDAF에 전달하십시오.

HADOOP

spark에서 구조체를 UDAF에 전달하십시오.

나는 다음과 같은 스키마를 가지고있다.

root
 |-- id:string (nullable = false)
 |-- age: long (nullable = true)
 |-- cars: struct (nullable = true)
 |    |-- car1: string (nullable = true)
 |    |-- car2: string (nullable = true)
 |    |-- car3: string (nullable = true)
 |-- name: string (nullable = true)

나는 구조체 '자동차'를 우다프에 어떻게 전달할 수 있습니까? 만약 내가 단지 자동차 하위 구조체를 전달하려면 inputSchema가되어야합니다.

해결법

  1. ==============================

    1.하지만 UDAF의 논리는 다를 수 있습니다. 예를 들어 두 개의 행이있는 경우 :

    하지만 UDAF의 논리는 다를 수 있습니다. 예를 들어 두 개의 행이있는 경우 :

    val seq = Seq(cars(cars_schema("car1", "car2", "car3")), (cars(cars_schema("car1", "car2", "car3"))))
    
    val rdd = spark.sparkContext.parallelize(seq)
    

    여기 스키마는

    root
     |-- cars: struct (nullable = true)
     |    |-- car1: string (nullable = true)
     |    |-- car2: string (nullable = true)
     |    |-- car3: string (nullable = true)
    

    다음 집계를 호출하려고하면 :

    val df = seq.toDF
    df.agg(agg0(col("cars")))
    

    다음과 같이 UDAF 입력 스키마를 변경해야합니다.

    val carsSchema =
        StructType(List(StructField("car1", StringType, true), StructField("car2", StringType, true), StructField("car3", StringType, true)))
    

    UDAF의 소년에게는이 스키마를 처리하여 inputSchema를 변경해야합니다.

    override def inputSchema: StructType = StructType(StructField("input", carsSchema) :: Nil)
    

    업데이트 방법에서 입력 행의 형식을 처리해야합니다. 행 :

    override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
      val i = input.getAs[Array[Array[String]]](0)
      // i here would be [car1,car2,car3],  an array of strings
      buffer(0) = ???
    }
    

    여기에서, 당신은 당신의 버퍼를 업데이트하고 병합을 완료하고 함수를 평가하도록 변환 할 수 있습니다.

  2. from https://stackoverflow.com/questions/54518102/pass-a-struct-to-an-udaf-in-spark by cc-by-sa and MIT license