복붙노트

[SCALA] 어떻게 각 그룹의 첫 번째 행을 선택하려면?

SCALA

어떻게 각 그룹의 첫 번째 행을 선택하려면?

나는 다음과 같이 생성 된 DataFrame 있습니다 :

df.groupBy($"Hour", $"Category")
  .agg(sum($"value") as "TotalValue")
  .sort($"Hour".asc, $"TotalValue".desc))

그 결과는 다음과 같다 :

+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
|   0|   cat26|      30.9|
|   0|   cat13|      22.1|
|   0|   cat95|      19.6|
|   0|  cat105|       1.3|
|   1|   cat67|      28.5|
|   1|    cat4|      26.8|
|   1|   cat13|      12.6|
|   1|   cat23|       5.3|
|   2|   cat56|      39.6|
|   2|   cat40|      29.7|
|   2|  cat187|      27.9|
|   2|   cat68|       9.8|
|   3|    cat8|      35.6|
| ...|    ....|      ....|
+----+--------+----------+

당신이 볼 수 있듯이, DataFrame는 내림차순으로 totalvalue가에 의해 다음, 증가하는 순서로 시간에 의해 정렬됩니다.

나는 각 그룹, 즉 상단의 행을 선택하고 싶습니다

그래서 원하는 출력은 다음과 같습니다

+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
|   0|   cat26|      30.9|
|   1|   cat67|      28.5|
|   2|   cat56|      39.6|
|   3|    cat8|      35.6|
| ...|     ...|       ...|
+----+--------+----------+

뿐만 아니라 각 그룹의 상위 N 행을 선택 할 수있는 것이 편리 할 수 ​​있습니다.

어떤 도움을 매우 높이 평가된다.

해결법

  1. ==============================

    1.윈도우 함수 :

    윈도우 함수 :

    이런 식으로 뭔가 트릭을 수행해야합니다

    import org.apache.spark.sql.functions.{row_number, max, broadcast}
    import org.apache.spark.sql.expressions.Window
    
    val df = sc.parallelize(Seq(
      (0,"cat26",30.9), (0,"cat13",22.1), (0,"cat95",19.6), (0,"cat105",1.3),
      (1,"cat67",28.5), (1,"cat4",26.8), (1,"cat13",12.6), (1,"cat23",5.3),
      (2,"cat56",39.6), (2,"cat40",29.7), (2,"cat187",27.9), (2,"cat68",9.8),
      (3,"cat8",35.6))).toDF("Hour", "Category", "TotalValue")
    
    val w = Window.partitionBy($"hour").orderBy($"TotalValue".desc)
    
    val dfTop = df.withColumn("rn", row_number.over(w)).where($"rn" === 1).drop("rn")
    
    dfTop.show
    // +----+--------+----------+
    // |Hour|Category|TotalValue|
    // +----+--------+----------+
    // |   0|   cat26|      30.9|
    // |   1|   cat67|      28.5|
    // |   2|   cat56|      39.6|
    // |   3|    cat8|      35.6|
    // +----+--------+----------+
    

    이 방법은 중요한 데이터 스큐의 경우 비효율적 일 것입니다.

    일반 SQL 집계는 다음에 조인

    또는 당신은 집계 된 데이터 프레임에 가입 할 수 있습니다 :

    val dfMax = df.groupBy($"hour".as("max_hour")).agg(max($"TotalValue").as("max_value"))
    
    val dfTopByJoin = df.join(broadcast(dfMax),
        ($"hour" === $"max_hour") && ($"TotalValue" === $"max_value"))
      .drop("max_hour")
      .drop("max_value")
    
    dfTopByJoin.show
    
    // +----+--------+----------+
    // |Hour|Category|TotalValue|
    // +----+--------+----------+
    // |   0|   cat26|      30.9|
    // |   1|   cat67|      28.5|
    // |   2|   cat56|      39.6|
    // |   3|    cat8|      35.6|
    // +----+--------+----------+
    

    (동일한 총 가치와 시간 당 한 개 이상의 카테고리가있는 경우)는 중복 값을 유지합니다. 다음과 같이이를 제거 할 수 있습니다 :

    dfTopByJoin
      .groupBy($"hour")
      .agg(
        first("category").alias("category"),
        first("TotalValue").alias("TotalValue"))
    

    구조체를 통해 순서를 사용 :

    깔끔한, 잘 테스트하지 않지만, 필요로하지 않는 비결은 윈도우 함수는 조인 :

    val dfTop = df.select($"Hour", struct($"TotalValue", $"Category").alias("vs"))
      .groupBy($"hour")
      .agg(max("vs").alias("vs"))
      .select($"Hour", $"vs.Category", $"vs.TotalValue")
    
    dfTop.show
    // +----+--------+----------+
    // |Hour|Category|TotalValue|
    // +----+--------+----------+
    // |   0|   cat26|      30.9|
    // |   1|   cat67|      28.5|
    // |   2|   cat56|      39.6|
    // |   3|    cat8|      35.6|
    // +----+--------+----------+
    

    데이터 집합 API를 (2.0, 1.6 스파크) :

    1.6 스파크 :

    case class Record(Hour: Integer, Category: String, TotalValue: Double)
    
    df.as[Record]
      .groupBy($"hour")
      .reduce((x, y) => if (x.TotalValue > y.TotalValue) x else y)
      .show
    
    // +---+--------------+
    // | _1|            _2|
    // +---+--------------+
    // |[0]|[0,cat26,30.9]|
    // |[1]|[1,cat67,28.5]|
    // |[2]|[2,cat56,39.6]|
    // |[3]| [3,cat8,35.6]|
    // +---+--------------+
    

    2.0 이상 스파크 :

    df.as[Record]
      .groupByKey(_.Hour)
      .reduceGroups((x, y) => if (x.TotalValue > y.TotalValue) x else y)
    

    마지막 두 가지 방법지도면이 결합하고 대부분의 시간이 더 나은 성능이 윈도우 함수에 비해 조인 전시해야하므로 전체 셔플을 필요로하지 않는 활용할 수 있습니다. 이 지팡이는 완료 출력 모드에서 구조적 스트리밍을 사용할 수.

    사용하지 마십시오 :

    df.orderBy(...).groupBy(...).agg(first(...), ...)
    

    (특히 로컬 모드에서) 제대로 작동 할 수 있지만 (SPARK-16207) 신뢰할 수 없다. 관련 JIRA 문제를 연결하기위한 Tzach 소할에 크레딧.

    같은 노트에 적용

    df.orderBy(...).dropDuplicates(...)
    

    이는 내부 등가 실행 계획을 사용한다.

  2. ==============================

    2.여러 열로 그룹화와 스파크 2.0.2의 경우 :

    여러 열로 그룹화와 스파크 2.0.2의 경우 :

    import org.apache.spark.sql.functions.row_number
    import org.apache.spark.sql.expressions.Window
    
    val w = Window.partitionBy($"col1", $"col2", $"col3").orderBy($"timestamp".desc)
    
    val refined_df = df.withColumn("rn", row_number.over(w)).where($"rn" === 1).drop("rn")
    
  3. ==============================

    3.이 zero323의 대답하지만, SQL 쿼리 방법으로 정확히 동일합니다.

    이 zero323의 대답하지만, SQL 쿼리 방법으로 정확히 동일합니다.

    그 dataframe 가정하면 생성하고 등록

    df.createOrReplaceTempView("table")
    //+----+--------+----------+
    //|Hour|Category|TotalValue|
    //+----+--------+----------+
    //|0   |cat26   |30.9      |
    //|0   |cat13   |22.1      |
    //|0   |cat95   |19.6      |
    //|0   |cat105  |1.3       |
    //|1   |cat67   |28.5      |
    //|1   |cat4    |26.8      |
    //|1   |cat13   |12.6      |
    //|1   |cat23   |5.3       |
    //|2   |cat56   |39.6      |
    //|2   |cat40   |29.7      |
    //|2   |cat187  |27.9      |
    //|2   |cat68   |9.8       |
    //|3   |cat8    |35.6      |
    //+----+--------+----------+
    

    창 기능 :

    sqlContext.sql("select Hour, Category, TotalValue from (select *, row_number() OVER (PARTITION BY Hour ORDER BY TotalValue DESC) as rn  FROM table) tmp where rn = 1").show(false)
    //+----+--------+----------+
    //|Hour|Category|TotalValue|
    //+----+--------+----------+
    //|1   |cat67   |28.5      |
    //|3   |cat8    |35.6      |
    //|2   |cat56   |39.6      |
    //|0   |cat26   |30.9      |
    //+----+--------+----------+
    

    일반 SQL 집계는 다음에 조인

    sqlContext.sql("select Hour, first(Category) as Category, first(TotalValue) as TotalValue from " +
      "(select Hour, Category, TotalValue from table tmp1 " +
      "join " +
      "(select Hour as max_hour, max(TotalValue) as max_value from table group by Hour) tmp2 " +
      "on " +
      "tmp1.Hour = tmp2.max_hour and tmp1.TotalValue = tmp2.max_value) tmp3 " +
      "group by tmp3.Hour")
      .show(false)
    //+----+--------+----------+
    //|Hour|Category|TotalValue|
    //+----+--------+----------+
    //|1   |cat67   |28.5      |
    //|3   |cat8    |35.6      |
    //|2   |cat56   |39.6      |
    //|0   |cat26   |30.9      |
    //+----+--------+----------+
    

    구조체를 통해 순서를 사용 :

    sqlContext.sql("select Hour, vs.Category, vs.TotalValue from (select Hour, max(struct(TotalValue, Category)) as vs from table group by Hour)").show(false)
    //+----+--------+----------+
    //|Hour|Category|TotalValue|
    //+----+--------+----------+
    //|1   |cat67   |28.5      |
    //|3   |cat8    |35.6      |
    //|2   |cat56   |39.6      |
    //|0   |cat26   |30.9      |
    //+----+--------+----------+
    

    데이터 집합 방법 및 도스는 원래의 대답과 동일하지 않습니다

  4. ==============================

    4.이 솔루션은 아래 하나의 GROUPBY을 수행하고 한 번에 MAXVALUE를가 포함 된 dataframe의 행을 추출합니다. 더에 대한 필요가 조인하지, 또는 Windows.

    이 솔루션은 아래 하나의 GROUPBY을 수행하고 한 번에 MAXVALUE를가 포함 된 dataframe의 행을 추출합니다. 더에 대한 필요가 조인하지, 또는 Windows.

    import org.apache.spark.sql.Row
    import org.apache.spark.sql.catalyst.encoders.RowEncoder
    import org.apache.spark.sql.DataFrame
    
    //df is the dataframe with Day, Category, TotalValue
    
    implicit val dfEnc = RowEncoder(df.schema)
    
    val res: DataFrame = df.groupByKey{(r) => r.getInt(0)}.mapGroups[Row]{(day: Int, rows: Iterator[Row]) => i.maxBy{(r) => r.getDouble(2)}}
    
  5. ==============================

    5.dataframe API를 사용하여이 작업을 수행하는 좋은 방법은 지금처럼 argmax 로직을 사용하고 있습니다

    dataframe API를 사용하여이 작업을 수행하는 좋은 방법은 지금처럼 argmax 로직을 사용하고 있습니다

      val df = Seq(
        (0,"cat26",30.9), (0,"cat13",22.1), (0,"cat95",19.6), (0,"cat105",1.3),
        (1,"cat67",28.5), (1,"cat4",26.8), (1,"cat13",12.6), (1,"cat23",5.3),
        (2,"cat56",39.6), (2,"cat40",29.7), (2,"cat187",27.9), (2,"cat68",9.8),
        (3,"cat8",35.6)).toDF("Hour", "Category", "TotalValue")
    
      df.groupBy($"Hour")
        .agg(max(struct($"TotalValue", $"Category")).as("argmax"))
        .select($"Hour", $"argmax.*").show
    
     +----+----------+--------+
     |Hour|TotalValue|Category|
     +----+----------+--------+
     |   1|      28.5|   cat67|
     |   3|      35.6|    cat8|
     |   2|      39.6|   cat56|
     |   0|      30.9|   cat26|
     +----+----------+--------+
    
  6. ==============================

    6.패턴은 키에 의해 그룹 => 각 그룹 예에 뭔가를 감소 => dataframe에 반환

    패턴은 키에 의해 그룹 => 각 그룹 예에 뭔가를 감소 => dataframe에 반환

    그래서 내가 RDD 기능을 사용 Dataframe 추상화가이 경우 조금 복잡 생각

     val rdd: RDD[Row] = originalDf
      .rdd
      .groupBy(row => row.getAs[String]("grouping_row"))
      .map(iterableTuple => {
        iterableTuple._2.reduce(reduceFunction)
      })
    
    val productDf = sqlContext.createDataFrame(rdd, originalDf.schema)
    
  7. ==============================

    7.dataframe 여러 항목을 그룹화 할 경우,이 도움이 될 수 있습니다

    dataframe 여러 항목을 그룹화 할 경우,이 도움이 될 수 있습니다

    val keys = List("Hour", "Category");
     val selectFirstValueOfNoneGroupedColumns = 
     df.columns
       .filterNot(keys.toSet)
       .map(_ -> "first").toMap
     val grouped = 
     df.groupBy(keys.head, keys.tail: _*)
       .agg(selectFirstValueOfNoneGroupedColumns)
    

    이 비슷한 문제를 가진 사람을 도움이되기를 바랍니다

  8. ==============================

    8.여기에서 다음과 같이 할 수있다 -

    여기에서 다음과 같이 할 수있다 -

       val data = df.groupBy("Hour").agg(first("Hour").as("_1"),first("Category").as("Category"),first("TotalValue").as("TotalValue")).drop("Hour")
    
    data.withColumnRenamed("_1","Hour").show
    
  9. ==============================

    9.우리는 순위 () 윈도우 기능을 사용할 수 있습니다 (당신이 순위를 선택할 것 곳 = 1) 순위는 그룹의 모든 행의 수를 (이 경우는 시간이 될 것입니다) 추가

    우리는 순위 () 윈도우 기능을 사용할 수 있습니다 (당신이 순위를 선택할 것 곳 = 1) 순위는 그룹의 모든 행의 수를 (이 경우는 시간이 될 것입니다) 추가

    여기에 예입니다. (행 https://github.com/jaceklaskowski/mastering-apache-spark-book/blob/master/spark-sql-functions.adoc#rank)

    val dataset = spark.range(9).withColumn("bucket", 'id % 3)
    
    import org.apache.spark.sql.expressions.Window
    val byBucket = Window.partitionBy('bucket).orderBy('id)
    
    scala> dataset.withColumn("rank", rank over byBucket).show
    +---+------+----+
    | id|bucket|rank|
    +---+------+----+
    |  0|     0|   1|
    |  3|     0|   2|
    |  6|     0|   3|
    |  1|     1|   1|
    |  4|     1|   2|
    |  7|     1|   3|
    |  2|     2|   1|
    |  5|     2|   2|
    |  8|     2|   3|
    +---+------+----+
    
  10. from https://stackoverflow.com/questions/33878370/how-to-select-the-first-row-of-each-group by cc-by-sa and MIT license