[SCALA] 어떻게 각 그룹의 첫 번째 행을 선택하려면?
SCALA어떻게 각 그룹의 첫 번째 행을 선택하려면?
나는 다음과 같이 생성 된 DataFrame 있습니다 :
df.groupBy($"Hour", $"Category")
.agg(sum($"value") as "TotalValue")
.sort($"Hour".asc, $"TotalValue".desc))
그 결과는 다음과 같다 :
+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
| 0| cat26| 30.9|
| 0| cat13| 22.1|
| 0| cat95| 19.6|
| 0| cat105| 1.3|
| 1| cat67| 28.5|
| 1| cat4| 26.8|
| 1| cat13| 12.6|
| 1| cat23| 5.3|
| 2| cat56| 39.6|
| 2| cat40| 29.7|
| 2| cat187| 27.9|
| 2| cat68| 9.8|
| 3| cat8| 35.6|
| ...| ....| ....|
+----+--------+----------+
당신이 볼 수 있듯이, DataFrame는 내림차순으로 totalvalue가에 의해 다음, 증가하는 순서로 시간에 의해 정렬됩니다.
나는 각 그룹, 즉 상단의 행을 선택하고 싶습니다
그래서 원하는 출력은 다음과 같습니다
+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
| 0| cat26| 30.9|
| 1| cat67| 28.5|
| 2| cat56| 39.6|
| 3| cat8| 35.6|
| ...| ...| ...|
+----+--------+----------+
뿐만 아니라 각 그룹의 상위 N 행을 선택 할 수있는 것이 편리 할 수 있습니다.
어떤 도움을 매우 높이 평가된다.
해결법
-
==============================
1.윈도우 함수 :
윈도우 함수 :
이런 식으로 뭔가 트릭을 수행해야합니다
import org.apache.spark.sql.functions.{row_number, max, broadcast} import org.apache.spark.sql.expressions.Window val df = sc.parallelize(Seq( (0,"cat26",30.9), (0,"cat13",22.1), (0,"cat95",19.6), (0,"cat105",1.3), (1,"cat67",28.5), (1,"cat4",26.8), (1,"cat13",12.6), (1,"cat23",5.3), (2,"cat56",39.6), (2,"cat40",29.7), (2,"cat187",27.9), (2,"cat68",9.8), (3,"cat8",35.6))).toDF("Hour", "Category", "TotalValue") val w = Window.partitionBy($"hour").orderBy($"TotalValue".desc) val dfTop = df.withColumn("rn", row_number.over(w)).where($"rn" === 1).drop("rn") dfTop.show // +----+--------+----------+ // |Hour|Category|TotalValue| // +----+--------+----------+ // | 0| cat26| 30.9| // | 1| cat67| 28.5| // | 2| cat56| 39.6| // | 3| cat8| 35.6| // +----+--------+----------+
이 방법은 중요한 데이터 스큐의 경우 비효율적 일 것입니다.
일반 SQL 집계는 다음에 조인
또는 당신은 집계 된 데이터 프레임에 가입 할 수 있습니다 :
val dfMax = df.groupBy($"hour".as("max_hour")).agg(max($"TotalValue").as("max_value")) val dfTopByJoin = df.join(broadcast(dfMax), ($"hour" === $"max_hour") && ($"TotalValue" === $"max_value")) .drop("max_hour") .drop("max_value") dfTopByJoin.show // +----+--------+----------+ // |Hour|Category|TotalValue| // +----+--------+----------+ // | 0| cat26| 30.9| // | 1| cat67| 28.5| // | 2| cat56| 39.6| // | 3| cat8| 35.6| // +----+--------+----------+
(동일한 총 가치와 시간 당 한 개 이상의 카테고리가있는 경우)는 중복 값을 유지합니다. 다음과 같이이를 제거 할 수 있습니다 :
dfTopByJoin .groupBy($"hour") .agg( first("category").alias("category"), first("TotalValue").alias("TotalValue"))
구조체를 통해 순서를 사용 :
깔끔한, 잘 테스트하지 않지만, 필요로하지 않는 비결은 윈도우 함수는 조인 :
val dfTop = df.select($"Hour", struct($"TotalValue", $"Category").alias("vs")) .groupBy($"hour") .agg(max("vs").alias("vs")) .select($"Hour", $"vs.Category", $"vs.TotalValue") dfTop.show // +----+--------+----------+ // |Hour|Category|TotalValue| // +----+--------+----------+ // | 0| cat26| 30.9| // | 1| cat67| 28.5| // | 2| cat56| 39.6| // | 3| cat8| 35.6| // +----+--------+----------+
데이터 집합 API를 (2.0, 1.6 스파크) :
1.6 스파크 :
case class Record(Hour: Integer, Category: String, TotalValue: Double) df.as[Record] .groupBy($"hour") .reduce((x, y) => if (x.TotalValue > y.TotalValue) x else y) .show // +---+--------------+ // | _1| _2| // +---+--------------+ // |[0]|[0,cat26,30.9]| // |[1]|[1,cat67,28.5]| // |[2]|[2,cat56,39.6]| // |[3]| [3,cat8,35.6]| // +---+--------------+
2.0 이상 스파크 :
df.as[Record] .groupByKey(_.Hour) .reduceGroups((x, y) => if (x.TotalValue > y.TotalValue) x else y)
마지막 두 가지 방법지도면이 결합하고 대부분의 시간이 더 나은 성능이 윈도우 함수에 비해 조인 전시해야하므로 전체 셔플을 필요로하지 않는 활용할 수 있습니다. 이 지팡이는 완료 출력 모드에서 구조적 스트리밍을 사용할 수.
사용하지 마십시오 :
df.orderBy(...).groupBy(...).agg(first(...), ...)
(특히 로컬 모드에서) 제대로 작동 할 수 있지만 (SPARK-16207) 신뢰할 수 없다. 관련 JIRA 문제를 연결하기위한 Tzach 소할에 크레딧.
같은 노트에 적용
df.orderBy(...).dropDuplicates(...)
이는 내부 등가 실행 계획을 사용한다.
-
==============================
2.여러 열로 그룹화와 스파크 2.0.2의 경우 :
여러 열로 그룹화와 스파크 2.0.2의 경우 :
import org.apache.spark.sql.functions.row_number import org.apache.spark.sql.expressions.Window val w = Window.partitionBy($"col1", $"col2", $"col3").orderBy($"timestamp".desc) val refined_df = df.withColumn("rn", row_number.over(w)).where($"rn" === 1).drop("rn")
-
==============================
3.이 zero323의 대답하지만, SQL 쿼리 방법으로 정확히 동일합니다.
이 zero323의 대답하지만, SQL 쿼리 방법으로 정확히 동일합니다.
그 dataframe 가정하면 생성하고 등록
df.createOrReplaceTempView("table") //+----+--------+----------+ //|Hour|Category|TotalValue| //+----+--------+----------+ //|0 |cat26 |30.9 | //|0 |cat13 |22.1 | //|0 |cat95 |19.6 | //|0 |cat105 |1.3 | //|1 |cat67 |28.5 | //|1 |cat4 |26.8 | //|1 |cat13 |12.6 | //|1 |cat23 |5.3 | //|2 |cat56 |39.6 | //|2 |cat40 |29.7 | //|2 |cat187 |27.9 | //|2 |cat68 |9.8 | //|3 |cat8 |35.6 | //+----+--------+----------+
창 기능 :
sqlContext.sql("select Hour, Category, TotalValue from (select *, row_number() OVER (PARTITION BY Hour ORDER BY TotalValue DESC) as rn FROM table) tmp where rn = 1").show(false) //+----+--------+----------+ //|Hour|Category|TotalValue| //+----+--------+----------+ //|1 |cat67 |28.5 | //|3 |cat8 |35.6 | //|2 |cat56 |39.6 | //|0 |cat26 |30.9 | //+----+--------+----------+
일반 SQL 집계는 다음에 조인
sqlContext.sql("select Hour, first(Category) as Category, first(TotalValue) as TotalValue from " + "(select Hour, Category, TotalValue from table tmp1 " + "join " + "(select Hour as max_hour, max(TotalValue) as max_value from table group by Hour) tmp2 " + "on " + "tmp1.Hour = tmp2.max_hour and tmp1.TotalValue = tmp2.max_value) tmp3 " + "group by tmp3.Hour") .show(false) //+----+--------+----------+ //|Hour|Category|TotalValue| //+----+--------+----------+ //|1 |cat67 |28.5 | //|3 |cat8 |35.6 | //|2 |cat56 |39.6 | //|0 |cat26 |30.9 | //+----+--------+----------+
구조체를 통해 순서를 사용 :
sqlContext.sql("select Hour, vs.Category, vs.TotalValue from (select Hour, max(struct(TotalValue, Category)) as vs from table group by Hour)").show(false) //+----+--------+----------+ //|Hour|Category|TotalValue| //+----+--------+----------+ //|1 |cat67 |28.5 | //|3 |cat8 |35.6 | //|2 |cat56 |39.6 | //|0 |cat26 |30.9 | //+----+--------+----------+
데이터 집합 방법 및 도스는 원래의 대답과 동일하지 않습니다
-
==============================
4.이 솔루션은 아래 하나의 GROUPBY을 수행하고 한 번에 MAXVALUE를가 포함 된 dataframe의 행을 추출합니다. 더에 대한 필요가 조인하지, 또는 Windows.
이 솔루션은 아래 하나의 GROUPBY을 수행하고 한 번에 MAXVALUE를가 포함 된 dataframe의 행을 추출합니다. 더에 대한 필요가 조인하지, 또는 Windows.
import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.DataFrame //df is the dataframe with Day, Category, TotalValue implicit val dfEnc = RowEncoder(df.schema) val res: DataFrame = df.groupByKey{(r) => r.getInt(0)}.mapGroups[Row]{(day: Int, rows: Iterator[Row]) => i.maxBy{(r) => r.getDouble(2)}}
-
==============================
5.dataframe API를 사용하여이 작업을 수행하는 좋은 방법은 지금처럼 argmax 로직을 사용하고 있습니다
dataframe API를 사용하여이 작업을 수행하는 좋은 방법은 지금처럼 argmax 로직을 사용하고 있습니다
val df = Seq( (0,"cat26",30.9), (0,"cat13",22.1), (0,"cat95",19.6), (0,"cat105",1.3), (1,"cat67",28.5), (1,"cat4",26.8), (1,"cat13",12.6), (1,"cat23",5.3), (2,"cat56",39.6), (2,"cat40",29.7), (2,"cat187",27.9), (2,"cat68",9.8), (3,"cat8",35.6)).toDF("Hour", "Category", "TotalValue") df.groupBy($"Hour") .agg(max(struct($"TotalValue", $"Category")).as("argmax")) .select($"Hour", $"argmax.*").show +----+----------+--------+ |Hour|TotalValue|Category| +----+----------+--------+ | 1| 28.5| cat67| | 3| 35.6| cat8| | 2| 39.6| cat56| | 0| 30.9| cat26| +----+----------+--------+
-
==============================
6.패턴은 키에 의해 그룹 => 각 그룹 예에 뭔가를 감소 => dataframe에 반환
패턴은 키에 의해 그룹 => 각 그룹 예에 뭔가를 감소 => dataframe에 반환
그래서 내가 RDD 기능을 사용 Dataframe 추상화가이 경우 조금 복잡 생각
val rdd: RDD[Row] = originalDf .rdd .groupBy(row => row.getAs[String]("grouping_row")) .map(iterableTuple => { iterableTuple._2.reduce(reduceFunction) }) val productDf = sqlContext.createDataFrame(rdd, originalDf.schema)
-
==============================
7.dataframe 여러 항목을 그룹화 할 경우,이 도움이 될 수 있습니다
dataframe 여러 항목을 그룹화 할 경우,이 도움이 될 수 있습니다
val keys = List("Hour", "Category"); val selectFirstValueOfNoneGroupedColumns = df.columns .filterNot(keys.toSet) .map(_ -> "first").toMap val grouped = df.groupBy(keys.head, keys.tail: _*) .agg(selectFirstValueOfNoneGroupedColumns)
이 비슷한 문제를 가진 사람을 도움이되기를 바랍니다
-
==============================
8.여기에서 다음과 같이 할 수있다 -
여기에서 다음과 같이 할 수있다 -
val data = df.groupBy("Hour").agg(first("Hour").as("_1"),first("Category").as("Category"),first("TotalValue").as("TotalValue")).drop("Hour") data.withColumnRenamed("_1","Hour").show
-
==============================
9.우리는 순위 () 윈도우 기능을 사용할 수 있습니다 (당신이 순위를 선택할 것 곳 = 1) 순위는 그룹의 모든 행의 수를 (이 경우는 시간이 될 것입니다) 추가
우리는 순위 () 윈도우 기능을 사용할 수 있습니다 (당신이 순위를 선택할 것 곳 = 1) 순위는 그룹의 모든 행의 수를 (이 경우는 시간이 될 것입니다) 추가
여기에 예입니다. (행 https://github.com/jaceklaskowski/mastering-apache-spark-book/blob/master/spark-sql-functions.adoc#rank)
val dataset = spark.range(9).withColumn("bucket", 'id % 3) import org.apache.spark.sql.expressions.Window val byBucket = Window.partitionBy('bucket).orderBy('id) scala> dataset.withColumn("rank", rank over byBucket).show +---+------+----+ | id|bucket|rank| +---+------+----+ | 0| 0| 1| | 3| 0| 2| | 6| 0| 3| | 1| 1| 1| | 4| 1| 2| | 7| 1| 3| | 2| 2| 1| | 5| 2| 2| | 8| 2| 3| +---+------+----+
from https://stackoverflow.com/questions/33878370/how-to-select-the-first-row-of-each-group by cc-by-sa and MIT license
'SCALA' 카테고리의 다른 글
[SCALA] 어떻게 스칼라의 타입 삭제 주위를받을 수 있나요? 또는, 왜 나는 내 컬렉션의 형식 매개 변수를 얻을 수 없다? (0) | 2019.10.28 |
---|---|
[SCALA] 아파치 스파크의 의존성 문제를 해결 (0) | 2019.10.28 |
[SCALA] 스칼라의 방법 및 기능의 차이점 (0) | 2019.10.28 |
[SCALA] 스칼라에서 밑줄의 모든 용도는 무엇입니까? (0) | 2019.10.28 |
[SCALA] 어떻게 스파크 DataFrame를 피벗? (0) | 2019.10.28 |