[SCALA] 어떻게 GROUPBY 후 수집에 값을 집계하는?
SCALA어떻게 GROUPBY 후 수집에 값을 집계하는?
나는 다음과 같은 스키마와 dataframe 있습니다 :
[visitorId: string, trackingIds: array<string>, emailIds: array<string>]
(아니면 롤업?) 그룹에있는 방법을 찾고 trackingIds 및 emailIds 열이 함께 추가 할 visitorid하여이 dataframe. 그래서 예를 들어 내 초기 DF 같은 보이는 경우 :
visitorId |trackingIds|emailIds
+-----------+------------+--------
|a158| [666b] | [12]
|7g21| [c0b5] | [45]
|7g21| [c0b4] | [87]
|a158| [666b, 777c]| []
I는 다음과 같이 내 출력 DF 싶습니다
visitorId |trackingIds|emailIds
+-----------+------------+--------
|a158| [666b,666b,777c]| [12,'']
|7g21| [c0b5,c0b4] | [45, 87]
GROUPBY 및 AGG 연산자를 사용하지만, 많은 행운을 가지고 있지 시도.
해결법
-
==============================
1.스파크> = 2.4
스파크> = 2.4
당신은 내장 평평 기능 UDF를 평평하게 교체 할 수 있습니다
import org.apache.spark.sql.functions.flatten
-이기 때문에 나머지를 떠나.
스파크> = 2.0 <2.4
그것은 가능하지만, 매우 비싸다. 당신이 제공 한 데이터를 사용하여 :
case class Record( visitorId: String, trackingIds: Array[String], emailIds: Array[String]) val df = Seq( Record("a158", Array("666b"), Array("12")), Record("7g21", Array("c0b5"), Array("45")), Record("7g21", Array("c0b4"), Array("87")), Record("a158", Array("666b", "777c"), Array.empty[String])).toDF
및 도우미 기능 :
import org.apache.spark.sql.functions.udf val flatten = udf((xs: Seq[Seq[String]]) => xs.flatten)
우리는 자리로 공백을 채울 수 있습니다 :
import org.apache.spark.sql.functions.{array, lit, when} val dfWithPlaceholders = df.withColumn( "emailIds", when(size($"emailIds") === 0, array(lit(""))).otherwise($"emailIds"))
collect_lists 및 평평 :
import org.apache.spark.sql.functions.{array, collect_list} val emailIds = flatten(collect_list($"emailIds")).alias("emailIds") val trackingIds = flatten(collect_list($"trackingIds")).alias("trackingIds") df .groupBy($"visitorId") .agg(trackingIds, emailIds) // +---------+------------------+--------+ // |visitorId| trackingIds|emailIds| // +---------+------------------+--------+ // | a158|[666b, 666b, 777c]| [12, ]| // | 7g21| [c0b5, c0b4]|[45, 87]| // +---------+------------------+--------+
정적으로 입력 된 데이터 집합으로 :
df.as[Record] .groupByKey(_.visitorId) .mapGroups { case (key, vs) => vs.map(v => (v.trackingIds, v.emailIds)).toArray.unzip match { case (trackingIds, emailIds) => Record(key, trackingIds.flatten, emailIds.flatten) }} // +---------+------------------+--------+ // |visitorId| trackingIds|emailIds| // +---------+------------------+--------+ // | a158|[666b, 666b, 777c]| [12, ]| // | 7g21| [c0b5, c0b4]|[45, 87]| // +---------+------------------+--------+
스파크 1.x에서
당신은 RDD 및 그룹으로 변환 할 수 있습니다
import org.apache.spark.sql.Row dfWithPlaceholders.rdd .map { case Row(id: String, trcks: Seq[String @ unchecked], emails: Seq[String @ unchecked]) => (id, (trcks, emails)) } .groupByKey .map {case (key, vs) => vs.toArray.unzip match { case (trackingIds, emailIds) => Record(key, trackingIds.flatten, emailIds.flatten) }} .toDF // +---------+------------------+--------+ // |visitorId| trackingIds|emailIds| // +---------+------------------+--------+ // | 7g21| [c0b5, c0b4]|[45, 87]| // | a158|[666b, 666b, 777c]| [12, ]| // +---------+------------------+--------+
-
==============================
2.@ zero323의 대답은 거의 완료,하지만 불꽃은 우리에게 더 많은 유연성을 제공합니다. 어떻게 다음과 같은 솔루션에 대해?
@ zero323의 대답은 거의 완료,하지만 불꽃은 우리에게 더 많은 유연성을 제공합니다. 어떻게 다음과 같은 솔루션에 대해?
import org.apache.spark.sql.functions._ inventory .select($"*", explode($"trackingIds") as "tracking_id") .select($"*", explode($"emailIds") as "email_id") .groupBy("visitorId") .agg( collect_list("tracking_id") as "trackingIds", collect_list("email_id") as "emailIds")
(개선의 여지가 그래서 :)) 즉 그러나 모든 빈 컬렉션을 잎
-
==============================
3.당신은 사용자 정의 집계 기능을 사용할 수 있습니다.
당신은 사용자 정의 집계 기능을 사용할 수 있습니다.
1) customAggregation라는 스칼라 클래스를 사용 UDAF 정의를 작성합니다.
package com.package.name import org.apache.spark.sql.Row import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} import org.apache.spark.sql.types._ import scala.collection.JavaConverters._ class CustomAggregation() extends UserDefinedAggregateFunction { // Input Data Type Schema def inputSchema: StructType = StructType(Array(StructField("col5", ArrayType(StringType)))) // Intermediate Schema def bufferSchema = StructType(Array( StructField("col5_collapsed", ArrayType(StringType)))) // Returned Data Type . def dataType: DataType = ArrayType(StringType) // Self-explaining def deterministic = true // This function is called whenever key changes def initialize(buffer: MutableAggregationBuffer) = { buffer(0) = Array.empty[String] // initialize array } // Iterate over each entry of a group def update(buffer: MutableAggregationBuffer, input: Row) = { buffer(0) = if(!input.isNullAt(0)) buffer.getList[String](0).toArray ++ input.getList[String](0).toArray else buffer.getList[String](0).toArray } // Merge two partial aggregates def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = { buffer1(0) = buffer1.getList[String](0).toArray ++ buffer2.getList[String](0).toArray } // Called after all the entries are exhausted. def evaluate(buffer: Row) = { buffer.getList[String](0).asScala.toList.distinct } }
2) 다음과 같은 코드에서 UDAF를 사용
//define UDAF val CustomAggregation = new CustomAggregation() DataFrame .groupBy(col1,col2,col3) .agg(CustomAggregation(DataFrame(col5))).show()
from https://stackoverflow.com/questions/34202997/how-to-aggregate-values-into-collection-after-groupby by cc-by-sa and MIT license
'SCALA' 카테고리의 다른 글
[SCALA] 구문 설탕 : _ * 방법 매개 변수로 서열을 치료 (0) | 2019.11.05 |
---|---|
[SCALA] 고정 용량 및 사용자 정의 비교 사용해, PriorityQueue 구현이 있습니까? (0) | 2019.11.05 |
[SCALA] 어떻게 만들고 스칼라에서 다차원 배열을 사용 하는가? (0) | 2019.11.05 |
[SCALA] 스파크 : 조건부 dataframe에 열 추가 (0) | 2019.11.05 |
[SCALA] dataframe으로 csv 파일을 읽는 동안 스키마를 제공합니다 (0) | 2019.11.05 |