복붙노트

[SCALA] 스파크 데이터 세트에서 자신의 reduceByKey 롤링

SCALA

스파크 데이터 세트에서 자신의 reduceByKey 롤링

좀 더 RDDs에 추가 DataFrames 및 데이터 세트를 사용하는 방법을 배우게하기 위해 노력하고있어. RDD를 들어, 내가 someRDD.reduceByKey ((X, Y) => X + Y)를 할 수 알지만, 데이터 집합에 대한 해당 기능을 볼 수 없습니다. 그래서 하나를 쓰기로했다.

someRdd.map(x => ((x.fromId,x.toId),1)).map(x => collection.mutable.Map(x)).reduce((x,y) => {
  val result = mutable.HashMap.empty[(Long,Long),Int]
  val keys = mutable.HashSet.empty[(Long,Long)]
  y.keys.foreach(z => keys += z)
  x.keys.foreach(z => keys += z)
  for (elem <- keys) {
    val s1 = if(x.contains(elem)) x(elem) else 0
    val s2 = if(y.contains(elem)) y(elem) else 0
    result(elem) = s1 + s2
  }
  result
})

그러나,이 드라이버에 이르기를 반환합니다. 당신은 어떻게이 데이터 집합을 반환 쓸 것인가? 어쩌면 mapPartition과이 있습니까?

이 컴파일을 참고하지만 아직지도에 대한 인코더가 없기 때문에 실행되지 않습니다

해결법

  1. ==============================

    1.나는 당신의 목표는 데이터 집합이 관용구를 번역하는 가정 :

    나는 당신의 목표는 데이터 집합이 관용구를 번역하는 가정 :

    rdd.map(x => (x.someKey, x.someField))
       .reduceByKey(_ + _)
    
    // => returning an RDD of (KeyType, FieldType)
    

    현재, 가장 가까운 솔루션이 같은 데이터 집합 API의 외모 발견했다 :

    ds.map(x => (x.someKey, x.someField))          // [1]
      .groupByKey(_._1)                            
      .reduceGroups((a, b) => (a._1, a._2 + b._2))
      .map(_._2)                                   // [2]
    
    // => returning a Dataset of (KeyType, FieldType)
    
    // Comments:
    // [1] As far as I can see, having a map before groupByKey is required
    //     to end up with the proper type in reduceGroups. After all, we do
    //     not want to reduce over the original type, but the FieldType.
    // [2] required since reduceGroups converts back to Dataset[(K, V)]
    //     not knowing that our V's are already key-value pairs.
    

    그래서 아마 우리가 여기서 뭔가를 누락, 매우 우아하고도 훨씬 덜 확대됨에있는 빠른 벤치 마크에 따라 보이지 않는 ...

    참고 : (. _ someKey)이 대안은 첫 번째 단계로 GroupByKey에서를 사용할 수 있습니다. 문제는 GroupByKey에서를 사용하는 것은 KeyValueGroupedDataset에 정기적으로 데이터 집합에서 유형을 변경한다는 것입니다. 후자는 일반지도 기능이 없습니다. 대신 그것이 문서화 문자열에 따라 반복자 및 수행에 셔플 값을 감싸고 있기 때문에 매우 편리 보이지 않는 mapGroups을 제공합니다.

  2. ==============================

    2.GroupByKey에서 이전보다 효율적인 솔루션 사용의 mapPartitions은 셔플의 양을 줄일 수 (이 reduceByKey과 동일한 서명하지 유의 그러나 나는 데이터 세트는 튜플로 구성 요구보다 기능을 전달하는 더 유연한 생각)합니다.

    GroupByKey에서 이전보다 효율적인 솔루션 사용의 mapPartitions은 셔플의 양을 줄일 수 (이 reduceByKey과 동일한 서명하지 유의 그러나 나는 데이터 세트는 튜플로 구성 요구보다 기능을 전달하는 더 유연한 생각)합니다.

    def reduceByKey[V: ClassTag, K](ds: Dataset[V], f: V => K, g: (V, V) => V)
      (implicit encK: Encoder[K], encV: Encoder[V]): Dataset[(K, V)] = {
      def h[V: ClassTag, K](f: V => K, g: (V, V) => V, iter: Iterator[V]): Iterator[V] = {
        iter.toArray.groupBy(f).mapValues(_.reduce(g)).map(_._2).toIterator
      }
      ds.mapPartitions(h(f, g, _))
        .groupByKey(f)(encK)
        .reduceGroups(g)
    }
    

    데이터의 형상 / 크기에 따라,이 reduceByKey 성능 1 초 이내이고, 약 2 배 빨리 GroupByKey에서만큼 (_._ 1) .reduceGroups. 이 개선의 여지가 여전히, 그래서 제안을 환영 할 것이다.

  3. from https://stackoverflow.com/questions/38383207/rolling-your-own-reducebykey-in-spark-dataset by cc-by-sa and MIT license