복붙노트

[SCALA] reduceByKey : 어떻게 내부적으로 작동합니까?

SCALA

reduceByKey : 어떻게 내부적으로 작동합니까?

나는 스파크 및 스칼라 새로운 오전. 나는 스파크의 방법 reduceByKey 기능의 작동에 대해 혼란스러워했다. 우리는 다음과 같은 코드가 있다고 가정 :

val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)

지도 기능은 분명하다의 키이며 data.txt로에서 라인을 가리키는 1의 값입니다.

reduceByKey가 내부적으로 어떻게 작동하는지 그러나, 나는하지 않았다? 키에 "A"지점합니까? 또한, "S"을 "A"지점을합니까? 그리고 무엇은 +의 B를 표현합니까? 그들은 어떻게 채워?

해결법

  1. ==============================

    1.의 이산 방법과 유형을 분해 할 수 있습니다. 그것은 일반적으로 새로운 DEVS의 복잡한 노출 :

    의 이산 방법과 유형을 분해 할 수 있습니다. 그것은 일반적으로 새로운 DEVS의 복잡한 노출 :

    pairs.reduceByKey((a, b) => a + b)
    

    가된다

    pairs.reduceByKey((a: Int, b: Int) => a + b)
    

    및 변수의 이름을 변경하는 것은 좀 더 명시합니다

    pairs.reduceByKey((accumulatedValue: Int, currentValue: Int) => accumulatedValue + currentValue)
    

    그래서 우리는 지금 우리가 단순히 주어진 키에 대한 누적 값을 복용하고 해당 키의 다음 값을 합산하는 것을 볼 수 있습니다. 우리는 핵심 부분을 이해할 수 있도록 지금, 더 그것을 깰 수 있습니다. 자,이 방법보다이 같은 시각화하자 :

    pairs.reduce((accumulatedValue: List[(String, Int)], currentValue: (String, Int)) => {
      //Turn the accumulated value into a true key->value mapping
      val accumAsMap = accumulatedValue.toMap   
      //Try to get the key's current value if we've already encountered it
      accumAsMap.get(currentValue._1) match { 
        //If we have encountered it, then add the new value to the existing value and overwrite the old
        case Some(value : Int) => (accumAsMap + (currentValue._1 -> (value + currentValue._2))).toList
        //If we have NOT encountered it, then simply add it to the list
        case None => currentValue :: accumulatedValue 
      }
    })
    

    그래서, 당신은 reduceByKey 키를 발견하고 그 부분을 관리하는 방법에 대한 걱정을 할 필요가 없도록 그것을 추적하는 보일러 걸리는 것을 볼 수 있습니다.

    당신이 원하는 경우에 더 깊은, 더 진실

    모든 여기에 완료 일부 최적화이 있기 때문에 일어나는의 단순화 된 버전입니다 말했다되고있다. 이 작업은, 연관되는 스파크 엔진이 먼저 로컬이 감소를 수행 할 수 있도록 드라이버에서 다시 한 번 다음 (보통이라고지도 측 감소)합니다. 이 네트워크 트래픽을 절약; 대신에 모든 데이터를 전송하고, 동작을 수행하는, 그것이 가능한 한 소형으로 감소하고 그 위에 와이어 감소를 보낼 수있다.

  2. ==============================

    2.reduceByKey 기능에 대한 요구 사항 중 하나는 연관해야합니다 것입니다. reduceByKey 작동 방법에 대한 직관을 구축하기 위해, 먼저 연관 연관 함수는 병렬 계산에 도움이 방법을 보자 :

    reduceByKey 기능에 대한 요구 사항 중 하나는 연관해야합니다 것입니다. reduceByKey 작동 방법에 대한 직관을 구축하기 위해, 먼저 연관 연관 함수는 병렬 계산에 도움이 방법을 보자 :

    우리가 볼 수 있듯이, 우리는 조각과 연관 함수를 적용하여 원래의 콜렉션을 깰 수 있습니다, 우리는 총을 축적 할 수 있습니다. 순차 경우, 우리는 그것을 사용 사소한 : 1 + 2 + 3 + 4 + 5 + 6 + 7 + 8 + 9 + 10.

    연관성은 우리가 순서대로 병렬로 같은 기능을 사용할 수 있습니다. reduceByKey 파티션 이루어지는 분산 컬렉션 인 RDD의 결과를 계산하기 위해 그 특성을 이용한다.

    다음 예를 살펴 보겠습니다 :

    // collection of the form ("key",1),("key,2),...,("key",20) split among 4 partitions
    val rdd =sparkContext.parallelize(( (1 to 20).map(x=>("key",x))), 4)
    rdd.reduceByKey(_ + _)
    rdd.collect()
    > Array[(String, Int)] = Array((key,210))
    

    불꽃에서, 데이터 파티션에 분배된다. 다음 설명을 위해, (4) 파티션 얇은 라인 안의 왼쪽한다. 첫째, 우리는 순차 분할, 각 파티션에 로컬 함수를 적용하지만, 우리는 병렬로 4 개 분할을 실행. 그리고, 각 로컬 계산의 결과는 그 결과에 와서 다시 최종적으로 동일한 기능을 적용하여 응집된다.

    (병렬로)의 각 파티션의 결과 사이에인가되는 각 (순차적으로) 하나의 파티션에 적용되는 하나 reduceByKey는 aggregateByKey aggregateByKey의 특성화 2 개 기능을 취한다. reduceByKey는 두 경우 모두에서 동일한 연관 함수를 사용하여 각 파티션에서 순차적 컴퓨팅을하고 우리는 여기에 설명대로 최종 결과에 그 결과를 결합 할 수 있습니다.

  3. ==============================

    3.당신의 예에서

    당신의 예에서

    val counts = pairs.reduceByKey((a,b) => a+b)
    

    A 및 B 쌍에 튜플 _2 모두 지능 축전지이다. reduceKey 새로운 튜플 [문자열 지능]의 제조, 동일한 값의 두 튜플을하고, B 등 _2 그 값을 사용한다. 각 키의 단 하나의 튜플이있을 때까지이 작업을 반복한다.

    첫 번째 요소는 항상 누적 두 번째 값입니다 비 스파크 (또는, 정말, 비 병렬) reduceByKey 달리, reduceByKey 분산 방식으로 작동, 즉, 각 노드는 uniquely-의 모음으로 튜플의 그것의 세트를 줄일 수 튜플의 최종 고유-키 입력 세트가있을 때까지 키 입력 튜플과는 여러 노드에서 튜플을 줄일 수 있습니다. 노드의 결과로서 이러한 수단은 감소되어, A와 B는 이미 축전지 감소 나타낸다.

  4. ==============================

    4.스파크 RDD reduceByKey 기능 연관 기능을 감소하여 각 키에 대한 값을 병합.

    스파크 RDD reduceByKey 기능 연관 기능을 감소하여 각 키에 대한 값을 병합.

    reduceByKey 기능 만 RDDs에서 작동하며 이는 느리게 평가 수단은 변환 동작이다. 연관 기능 및 소스 RDD인가 한 결과 새로운 RDD를 생성하는 파라미터로서 전달된다.

    그래서 예에서, RDD 쌍 등 여러 쌍의 소자의 세트가 (S1,1), (S2,1) 그리고 reduceByKey 어큐뮬레이터를 초기화하는 기능 (어큐뮬레이터, N) => (어큐뮬레이터 + N)를 수용 등 변수는 0 값을 기본값으로 각 키의 요소를 추가하고 키 쌍의 총 수를 갖는 결과 RDD 수를 반환합니다.

  5. from https://stackoverflow.com/questions/30145329/reducebykey-how-does-it-work-internally by cc-by-sa and MIT license