복붙노트

[SCALA] 어떻게 GROUPBY 후 수집에 값을 집계하는?

SCALA

어떻게 GROUPBY 후 수집에 값을 집계하는?

나는 다음과 같은 스키마와 dataframe 있습니다 :

[visitorId: string, trackingIds: array<string>, emailIds: array<string>]

(아니면 롤업?) 그룹에있는 방법을 찾고 trackingIds 및 emailIds 열이 함께 추가 할 visitorid하여이 dataframe. 그래서 예를 들어 내 초기 DF 같은 보이는 경우 :

visitorId   |trackingIds|emailIds
+-----------+------------+--------
|a158|      [666b]      |    [12]
|7g21|      [c0b5]      |    [45]
|7g21|      [c0b4]      |    [87]
|a158|      [666b, 777c]|    []

I는 다음과 같이 내 출력 DF 싶습니다

visitorId   |trackingIds|emailIds
+-----------+------------+--------
|a158|      [666b,666b,777c]|      [12,'']
|7g21|      [c0b5,c0b4]     |      [45, 87]

GROUPBY 및 AGG 연산자를 사용하지만, 많은 행운을 가지고 있지 시도.

해결법

  1. ==============================

    1.스파크> = 2.4

    스파크> = 2.4

    당신은 내장 평평 기능 UDF를 평평하게 교체 할 수 있습니다

    import org.apache.spark.sql.functions.flatten
    

    -이기 때문에 나머지를 떠나.

    스파크> = 2.0 <2.4

    그것은 가능하지만, 매우 비싸다. 당신이 제공 한 데이터를 사용하여 :

    case class Record(
        visitorId: String, trackingIds: Array[String], emailIds: Array[String])
    
    val df = Seq(
      Record("a158", Array("666b"), Array("12")),
      Record("7g21", Array("c0b5"), Array("45")),
      Record("7g21", Array("c0b4"), Array("87")),
      Record("a158", Array("666b",  "777c"), Array.empty[String])).toDF
    

    및 도우미 기능 :

    import org.apache.spark.sql.functions.udf
    
    val flatten = udf((xs: Seq[Seq[String]]) => xs.flatten)
    

    우리는 자리로 공백을 채울 수 있습니다 :

    import org.apache.spark.sql.functions.{array, lit, when}
    
    val dfWithPlaceholders = df.withColumn(
      "emailIds", 
      when(size($"emailIds") === 0, array(lit(""))).otherwise($"emailIds"))
    

    collect_lists 및 평평 :

    import org.apache.spark.sql.functions.{array, collect_list}
    
    val emailIds = flatten(collect_list($"emailIds")).alias("emailIds")
    val trackingIds = flatten(collect_list($"trackingIds")).alias("trackingIds")
    
    df
      .groupBy($"visitorId")
      .agg(trackingIds, emailIds)
    
    // +---------+------------------+--------+
    // |visitorId|       trackingIds|emailIds|
    // +---------+------------------+--------+
    // |     a158|[666b, 666b, 777c]|  [12, ]|
    // |     7g21|      [c0b5, c0b4]|[45, 87]|
    // +---------+------------------+--------+
    

    정적으로 입력 된 데이터 집합으로 :

    df.as[Record]
      .groupByKey(_.visitorId)
      .mapGroups { case (key, vs) => 
        vs.map(v => (v.trackingIds, v.emailIds)).toArray.unzip match {
          case (trackingIds, emailIds) => 
            Record(key, trackingIds.flatten, emailIds.flatten)
      }}
    
    // +---------+------------------+--------+
    // |visitorId|       trackingIds|emailIds|
    // +---------+------------------+--------+
    // |     a158|[666b, 666b, 777c]|  [12, ]|
    // |     7g21|      [c0b5, c0b4]|[45, 87]|
    // +---------+------------------+--------+
    

    스파크 1.x에서

    당신은 RDD 및 그룹으로 변환 할 수 있습니다

    import org.apache.spark.sql.Row
    
    dfWithPlaceholders.rdd
      .map {
         case Row(id: String, 
           trcks: Seq[String @ unchecked],
           emails: Seq[String @ unchecked]) => (id, (trcks, emails))
      }
      .groupByKey
      .map {case (key, vs) => vs.toArray.unzip match {
        case (trackingIds, emailIds) => 
          Record(key, trackingIds.flatten, emailIds.flatten)
      }}
      .toDF
    
    // +---------+------------------+--------+
    // |visitorId|       trackingIds|emailIds|
    // +---------+------------------+--------+
    // |     7g21|      [c0b5, c0b4]|[45, 87]|
    // |     a158|[666b, 666b, 777c]|  [12, ]|
    // +---------+------------------+--------+
    
  2. ==============================

    2.@ zero323의 대답은 거의 완료,하지만 불꽃은 우리에게 더 많은 유연성을 제공합니다. 어떻게 다음과 같은 솔루션에 대해?

    @ zero323의 대답은 거의 완료,하지만 불꽃은 우리에게 더 많은 유연성을 제공합니다. 어떻게 다음과 같은 솔루션에 대해?

    import org.apache.spark.sql.functions._
    inventory
      .select($"*", explode($"trackingIds") as "tracking_id")
      .select($"*", explode($"emailIds") as "email_id")
      .groupBy("visitorId")
      .agg(
        collect_list("tracking_id") as "trackingIds",
        collect_list("email_id") as "emailIds")
    

    (개선의 여지가 그래서 :)) 즉 그러나 모든 빈 컬렉션을 잎

  3. ==============================

    3.당신은 사용자 정의 집계 기능을 사용할 수 있습니다.

    당신은 사용자 정의 집계 기능을 사용할 수 있습니다.

    1) customAggregation라는 스칼라 클래스를 사용 UDAF 정의를 작성합니다.

    package com.package.name
    
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
    import org.apache.spark.sql.types._
    import scala.collection.JavaConverters._
    
    class CustomAggregation() extends UserDefinedAggregateFunction {
    
    // Input Data Type Schema
    def inputSchema: StructType = StructType(Array(StructField("col5", ArrayType(StringType))))
    
    // Intermediate Schema
    def bufferSchema = StructType(Array(
    StructField("col5_collapsed",  ArrayType(StringType))))
    
    // Returned Data Type .
    def dataType: DataType = ArrayType(StringType)
    
    // Self-explaining
    def deterministic = true
    
    // This function is called whenever key changes
    def initialize(buffer: MutableAggregationBuffer) = {
    buffer(0) = Array.empty[String] // initialize array
    }
    
    // Iterate over each entry of a group
    def update(buffer: MutableAggregationBuffer, input: Row) = {
    buffer(0) =
      if(!input.isNullAt(0))
        buffer.getList[String](0).toArray ++ input.getList[String](0).toArray
      else
        buffer.getList[String](0).toArray
    }
    
      // Merge two partial aggregates
     def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
     buffer1(0) = buffer1.getList[String](0).toArray ++ buffer2.getList[String](0).toArray
    }
    
     // Called after all the entries are exhausted.
     def evaluate(buffer: Row) = {
      buffer.getList[String](0).asScala.toList.distinct
     }
    }
    

    2) 다음과 같은 코드에서 UDAF를 사용

    //define UDAF
    val CustomAggregation = new CustomAggregation()
    DataFrame
        .groupBy(col1,col2,col3)
        .agg(CustomAggregation(DataFrame(col5))).show()
    
  4. from https://stackoverflow.com/questions/34202997/how-to-aggregate-values-into-collection-after-groupby by cc-by-sa and MIT license