[SCALA] 스파크 데이터 세트에서 자신의 reduceByKey 롤링
SCALA스파크 데이터 세트에서 자신의 reduceByKey 롤링
좀 더 RDDs에 추가 DataFrames 및 데이터 세트를 사용하는 방법을 배우게하기 위해 노력하고있어. RDD를 들어, 내가 someRDD.reduceByKey ((X, Y) => X + Y)를 할 수 알지만, 데이터 집합에 대한 해당 기능을 볼 수 없습니다. 그래서 하나를 쓰기로했다.
someRdd.map(x => ((x.fromId,x.toId),1)).map(x => collection.mutable.Map(x)).reduce((x,y) => {
val result = mutable.HashMap.empty[(Long,Long),Int]
val keys = mutable.HashSet.empty[(Long,Long)]
y.keys.foreach(z => keys += z)
x.keys.foreach(z => keys += z)
for (elem <- keys) {
val s1 = if(x.contains(elem)) x(elem) else 0
val s2 = if(y.contains(elem)) y(elem) else 0
result(elem) = s1 + s2
}
result
})
그러나,이 드라이버에 이르기를 반환합니다. 당신은 어떻게이 데이터 집합을 반환 쓸 것인가? 어쩌면 mapPartition과이 있습니까?
이 컴파일을 참고하지만 아직지도에 대한 인코더가 없기 때문에 실행되지 않습니다
해결법
-
==============================
1.나는 당신의 목표는 데이터 집합이 관용구를 번역하는 가정 :
나는 당신의 목표는 데이터 집합이 관용구를 번역하는 가정 :
rdd.map(x => (x.someKey, x.someField)) .reduceByKey(_ + _) // => returning an RDD of (KeyType, FieldType)
현재, 가장 가까운 솔루션이 같은 데이터 집합 API의 외모 발견했다 :
ds.map(x => (x.someKey, x.someField)) // [1] .groupByKey(_._1) .reduceGroups((a, b) => (a._1, a._2 + b._2)) .map(_._2) // [2] // => returning a Dataset of (KeyType, FieldType) // Comments: // [1] As far as I can see, having a map before groupByKey is required // to end up with the proper type in reduceGroups. After all, we do // not want to reduce over the original type, but the FieldType. // [2] required since reduceGroups converts back to Dataset[(K, V)] // not knowing that our V's are already key-value pairs.
그래서 아마 우리가 여기서 뭔가를 누락, 매우 우아하고도 훨씬 덜 확대됨에있는 빠른 벤치 마크에 따라 보이지 않는 ...
참고 : (. _ someKey)이 대안은 첫 번째 단계로 GroupByKey에서를 사용할 수 있습니다. 문제는 GroupByKey에서를 사용하는 것은 KeyValueGroupedDataset에 정기적으로 데이터 집합에서 유형을 변경한다는 것입니다. 후자는 일반지도 기능이 없습니다. 대신 그것이 문서화 문자열에 따라 반복자 및 수행에 셔플 값을 감싸고 있기 때문에 매우 편리 보이지 않는 mapGroups을 제공합니다.
-
==============================
2.GroupByKey에서 이전보다 효율적인 솔루션 사용의 mapPartitions은 셔플의 양을 줄일 수 (이 reduceByKey과 동일한 서명하지 유의 그러나 나는 데이터 세트는 튜플로 구성 요구보다 기능을 전달하는 더 유연한 생각)합니다.
GroupByKey에서 이전보다 효율적인 솔루션 사용의 mapPartitions은 셔플의 양을 줄일 수 (이 reduceByKey과 동일한 서명하지 유의 그러나 나는 데이터 세트는 튜플로 구성 요구보다 기능을 전달하는 더 유연한 생각)합니다.
def reduceByKey[V: ClassTag, K](ds: Dataset[V], f: V => K, g: (V, V) => V) (implicit encK: Encoder[K], encV: Encoder[V]): Dataset[(K, V)] = { def h[V: ClassTag, K](f: V => K, g: (V, V) => V, iter: Iterator[V]): Iterator[V] = { iter.toArray.groupBy(f).mapValues(_.reduce(g)).map(_._2).toIterator } ds.mapPartitions(h(f, g, _)) .groupByKey(f)(encK) .reduceGroups(g) }
데이터의 형상 / 크기에 따라,이 reduceByKey 성능 1 초 이내이고, 약 2 배 빨리 GroupByKey에서만큼 (_._ 1) .reduceGroups. 이 개선의 여지가 여전히, 그래서 제안을 환영 할 것이다.
from https://stackoverflow.com/questions/38383207/rolling-your-own-reducebykey-in-spark-dataset by cc-by-sa and MIT license
'SCALA' 카테고리의 다른 글
[SCALA] 스칼라에서 어떻게 SQL SUM 및 GROUP BY에 해당 할 수 있습니까? (0) | 2019.11.28 |
---|---|
[SCALA] 놀이! 프레임 워크 : 템플릿에 변수를 정의? [복제] (0) | 2019.11.28 |
[SCALA] 스파크 / 스칼라 dataframe에서의 하나 개의 컬럼의 값을 합산하는 방법 (0) | 2019.11.28 |
[SCALA] 추가하거나 스칼라에서 튜플에 요소를 앞에 추가하는 방법 (0) | 2019.11.28 |
[SCALA] 스칼라 : 나는 스칼라를 사용 Dataframes에 값을 대체 할 수있는 방법 (0) | 2019.11.28 |