복붙노트

[HADOOP] Spark + 스칼라 변환, 불변성 및 메모리 소비 간접비

HADOOP

Spark + 스칼라 변환, 불변성 및 메모리 소비 간접비

나는 Youtube에서 Spark 아키텍처에 관한 몇 가지 비디오를 보았습니다.

Lazy 평가, 실패시 데이터 생성의 탄력성, 좋은 함수 프로그래밍 개념이 Resilenace Distributed Datasets의 성공을위한 이유이기는하지만, 데이터 변형으로 인한 메모리 오버 헤드로 인한 다중 변환으로 인한 메모리 오버 헤드가 걱정됩니다.

개념을 올바르게 이해하면 모든 변환이 새로운 데이터 세트를 생성하므로 여러 번 메모리 요구 사항이 사라질 것입니다. 코드에서 10 개의 변형을 사용하면 10 세트의 데이터 세트가 생성되고 메모리 소비는 10 배 증가합니다.

e.

val textFile = sc.textFile("hdfs://...")
val counts = textFile.flatMap(line => line.split(" "))
                 .map(word => (word, 1))
                 .reduceByKey(_ + _)
counts.saveAsTextFile("hdfs://...")

위의 예제에는 flatMap, map 및 reduceByKey의 세 가지 변형이 있습니다. X 크기의 데이터에 3X 메모리의 메모리가 필요하다는 의미입니까?

내 이해가 맞습니까? RDD 캐싱은이 문제를 해결할 수있는 유일한 해결책입니까?

일단 캐싱을 시작하면 디스크 IO 작업으로 인해 큰 크기와 성능이 영향을받을 수 있으므로 디스크로 넘칠 수 있습니다. 이 경우 Hadoop과 Spark의 성능은 비슷합니까?

편집하다:

답변과 의견에서 나는 게으른 초기화와 파이프 라인 프로세스를 이해했다. X가 초기 RDD 크기 인 3 X 메모리의 가정은 정확하지 않습니다.

그러나 1X RDD를 메모리에 캐시하고 pipleline을 통해 업데이트 할 수 있습니까? cache ()는 어떻게 작동합니까?

해결법

  1. ==============================

    1.우선, 지연 실행은 기능 구성이 발생할 수 있음을 의미합니다.

    우선, 지연 실행은 기능 구성이 발생할 수 있음을 의미합니다.

    scala> val rdd = sc.makeRDD(List("This is a test", "This is another test", 
                                     "And yet another test"), 1)
    rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[70] at makeRDD at <console>:27
    
    scala> val counts = rdd.flatMap(line => {println(line);line.split(" ")}).
         | map(word => {println(word);(word,1)}).
         | reduceByKey((x,y) => {println(s"$x+$y");x+y}).
         | collect
    This is a test
    This
    is
    a
    test
    This is another test
    This
    1+1
    is
    1+1
    another
    test
    1+1
    And yet another test
    And
    yet
    another
    1+1
    test
    2+1
    counts: Array[(String, Int)] = Array((And,1), (is,2), (another,2), (a,1), (This,2), (yet,1), (test,3))
    

    첫 번째는 병렬성을 1로 낮추어 단일 작업자에서 어떻게 보이는지 확인할 수 있습니다. 그런 다음 각 변형에 println을 추가하여 워크 플로가 어떻게 움직이는 지 확인할 수 있습니다. 라인을 처리 한 다음 해당 라인의 출력을 처리 한 다음 줄이기를 처리합니다. 따라서 제안한대로 각 변형에 대해 별도의 상태가 저장되지는 ​​않습니다. 대신 UI의 DAG 시각화에서 볼 수 있듯이 각 데이터 조각이 셔플이 필요할 때까지 전체 변환을 통해 반복됩니다.

    그것은 게으름에서의 승리입니다. Spark v Hadoop에 관해서는 이미 많은 부분이 있지만 (단지 Google), Spark가 네트워크 대역폭을 활용하여 사용하기 쉬운 경향이 있다는 것입니다. 그런 다음, 특히 스키마가 알려져 있고 DataFrames API를 활용할 수있는 경우 게으름으로 인해 많은 성능 향상을 얻을 수 있습니다.

    따라서 전반적으로 Spark는 MR이 모든면에서 손을.습니다.

  2. ==============================

    2.스파크 작업에 10 번의 변형이있을 경우, 스파크의 메모리 요구량은 10 배가 아닙니다. 작업에서 변환 단계를 지정할 때 Spark는 작업의 모든 단계를 실행할 수있는 DAG를 작성합니다. 그 다음에는 작업을 단계별로 나눕니다. 스테이지는 일련의 변환으로, Spark은 셔플없이 데이터 세트에서 실행할 수 있습니다.

    스파크 작업에 10 번의 변형이있을 경우, 스파크의 메모리 요구량은 10 배가 아닙니다. 작업에서 변환 단계를 지정할 때 Spark는 작업의 모든 단계를 실행할 수있는 DAG를 작성합니다. 그 다음에는 작업을 단계별로 나눕니다. 스테이지는 일련의 변환으로, Spark은 셔플없이 데이터 세트에서 실행할 수 있습니다.

    RDD에서 동작이 트리거되면 Spark는 DAG를 평가합니다. 스테이지의 끝 부분에 도달 할 때까지 스테이지의 모든 변형을 함께 적용하기 때문에 각 변형이 셔플로 이어지지 않는 한 메모리 압력이 10 시간이되지는 않습니다 (이 경우에는 잘못 작성된 작업 일 것입니다) .

    나는이 이야기를보고 슬라이드를 통해가는 것이 좋습니다.

  3. from https://stackoverflow.com/questions/35146482/spark-scala-transformations-immutability-memory-consumption-overheads by cc-by-sa and MIT license