복붙노트

[SCALA] 스파크 RDD의 foreach는 내부 수정 수집

SCALA

스파크 RDD의 foreach는 내부 수정 수집

나는 RDD의 요소를 반복하면서지도에 요소를 추가하려고 해요. 나는 오류를받지 못했습니다,하지만 수정은 발생되지 않습니다.

모든 것이 잘 직접 추가하거나 다른 컬렉션을 반복 작동합니다 :

scala> val myMap = new collection.mutable.HashMap[String,String]
myMap: scala.collection.mutable.HashMap[String,String] = Map()

scala> myMap("test1")="test1"

scala> myMap
res44: scala.collection.mutable.HashMap[String,String] = Map(test1 -> test1)

scala> List("test2", "test3").foreach(w => myMap(w) = w)

scala> myMap
res46: scala.collection.mutable.HashMap[String,String] = Map(test2 -> test2, test1 -> test1, test3 -> test3)

그러나 나는 RDD에서 동일한 작업을 수행 할 때 :

scala> val fromFile = sc.textFile("tests.txt")
...
scala> fromFile.take(3)
...
res48: Array[String] = Array(test4, test5, test6)

scala> fromFile.foreach(w => myMap(w) = w)
scala> myMap
res50: scala.collection.mutable.HashMap[String,String] = Map(test2 -> test2, test1 -> test1, test3 -> test3)

그것은 확실히 변수가 동일 만들기 위해 foreach는 전에, 그리고 제대로 인쇄 나는지도의 내용을 인쇄 시도했다 :

fromFile.foreach(w => println(myMap("test1")))
...
test1
test1
test1
...

나는 또한 foreach는 코드 내지도의 수정 된 요소를 인쇄했습니다 그리고는 수정 인쇄합니다,하지만 작업이 완료되면,지도 수정되지 않은 것 같다.

scala> fromFile.foreach({w => myMap(w) = w; println(myMap(w))})
...
test4
test5
test6
...
scala> myMap
res55: scala.collection.mutable.HashMap[String,String] = Map(test2 -> test2, test1 -> test1, test3 -> test3)

배열 (수집)도 잘 작동에 RDD 변환 :

fromFile.collect.foreach(w => myMap(w) = w)
scala> myMap
res89: scala.collection.mutable.HashMap[String,String] = Map(test2 -> test2, test5 -> test5, test1 -> test1, test4 -> test4, test6 -> test6, test3 -> test3)

이 컨텍스트의 문제인가? 나는 다른 곳에서 수정되는 데이터의 사본을 액세스하는 건가요?

해결법

  1. ==============================

    1.스파크 클러스터 (하지 단일 시스템)에서 실행 때 명확해진다. RDD 이제 여러 시스템에 분산되어있다. 당신의 foreach를 호출 할 때, 당신은 가지고있는 RDD의 조각으로 무엇을해야 하는지를 각 시스템을 말한다. 당신이 어떤 지역 변수 (그리드에는 myMap 같은)를 참조하면, 그들은 직렬화 얻을 그들이 사용할 수 있도록, 기계로 전송. 그러나 아무것도 돌아 오지 않는다. 그래서에는 myMap의 원래 사본은 영향을받지 않습니다.

    스파크 클러스터 (하지 단일 시스템)에서 실행 때 명확해진다. RDD 이제 여러 시스템에 분산되어있다. 당신의 foreach를 호출 할 때, 당신은 가지고있는 RDD의 조각으로 무엇을해야 하는지를 각 시스템을 말한다. 당신이 어떤 지역 변수 (그리드에는 myMap 같은)를 참조하면, 그들은 직렬화 얻을 그들이 사용할 수 있도록, 기계로 전송. 그러나 아무것도 돌아 오지 않는다. 그래서에는 myMap의 원래 사본은 영향을받지 않습니다.

    나는이 질문에 응답 생각하지만, 분명히 당신이 뭔가를 달성하기 위해 노력하고있다 그리고 당신은이 방법으로도 얻을 수 없습니다. 여기에 또는 당신이 뭘 하려는지 별도의 질문에 설명 부담없이, 나는 도움말을 시도 할 것이다.

  2. from https://stackoverflow.com/questions/23394286/modify-collection-inside-a-spark-rdd-foreach by cc-by-sa and MIT license