[SCALA] 스파크 RDD의 foreach는 내부 수정 수집
SCALA스파크 RDD의 foreach는 내부 수정 수집
나는 RDD의 요소를 반복하면서지도에 요소를 추가하려고 해요. 나는 오류를받지 못했습니다,하지만 수정은 발생되지 않습니다.
모든 것이 잘 직접 추가하거나 다른 컬렉션을 반복 작동합니다 :
scala> val myMap = new collection.mutable.HashMap[String,String]
myMap: scala.collection.mutable.HashMap[String,String] = Map()
scala> myMap("test1")="test1"
scala> myMap
res44: scala.collection.mutable.HashMap[String,String] = Map(test1 -> test1)
scala> List("test2", "test3").foreach(w => myMap(w) = w)
scala> myMap
res46: scala.collection.mutable.HashMap[String,String] = Map(test2 -> test2, test1 -> test1, test3 -> test3)
그러나 나는 RDD에서 동일한 작업을 수행 할 때 :
scala> val fromFile = sc.textFile("tests.txt")
...
scala> fromFile.take(3)
...
res48: Array[String] = Array(test4, test5, test6)
scala> fromFile.foreach(w => myMap(w) = w)
scala> myMap
res50: scala.collection.mutable.HashMap[String,String] = Map(test2 -> test2, test1 -> test1, test3 -> test3)
그것은 확실히 변수가 동일 만들기 위해 foreach는 전에, 그리고 제대로 인쇄 나는지도의 내용을 인쇄 시도했다 :
fromFile.foreach(w => println(myMap("test1")))
...
test1
test1
test1
...
나는 또한 foreach는 코드 내지도의 수정 된 요소를 인쇄했습니다 그리고는 수정 인쇄합니다,하지만 작업이 완료되면,지도 수정되지 않은 것 같다.
scala> fromFile.foreach({w => myMap(w) = w; println(myMap(w))})
...
test4
test5
test6
...
scala> myMap
res55: scala.collection.mutable.HashMap[String,String] = Map(test2 -> test2, test1 -> test1, test3 -> test3)
배열 (수집)도 잘 작동에 RDD 변환 :
fromFile.collect.foreach(w => myMap(w) = w)
scala> myMap
res89: scala.collection.mutable.HashMap[String,String] = Map(test2 -> test2, test5 -> test5, test1 -> test1, test4 -> test4, test6 -> test6, test3 -> test3)
이 컨텍스트의 문제인가? 나는 다른 곳에서 수정되는 데이터의 사본을 액세스하는 건가요?
해결법
-
==============================
1.스파크 클러스터 (하지 단일 시스템)에서 실행 때 명확해진다. RDD 이제 여러 시스템에 분산되어있다. 당신의 foreach를 호출 할 때, 당신은 가지고있는 RDD의 조각으로 무엇을해야 하는지를 각 시스템을 말한다. 당신이 어떤 지역 변수 (그리드에는 myMap 같은)를 참조하면, 그들은 직렬화 얻을 그들이 사용할 수 있도록, 기계로 전송. 그러나 아무것도 돌아 오지 않는다. 그래서에는 myMap의 원래 사본은 영향을받지 않습니다.
스파크 클러스터 (하지 단일 시스템)에서 실행 때 명확해진다. RDD 이제 여러 시스템에 분산되어있다. 당신의 foreach를 호출 할 때, 당신은 가지고있는 RDD의 조각으로 무엇을해야 하는지를 각 시스템을 말한다. 당신이 어떤 지역 변수 (그리드에는 myMap 같은)를 참조하면, 그들은 직렬화 얻을 그들이 사용할 수 있도록, 기계로 전송. 그러나 아무것도 돌아 오지 않는다. 그래서에는 myMap의 원래 사본은 영향을받지 않습니다.
나는이 질문에 응답 생각하지만, 분명히 당신이 뭔가를 달성하기 위해 노력하고있다 그리고 당신은이 방법으로도 얻을 수 없습니다. 여기에 또는 당신이 뭘 하려는지 별도의 질문에 설명 부담없이, 나는 도움말을 시도 할 것이다.
from https://stackoverflow.com/questions/23394286/modify-collection-inside-a-spark-rdd-foreach by cc-by-sa and MIT license
'SCALA' 카테고리의 다른 글
[SCALA] "알 수없는 유물. scalatest 아니 해결 또는 인덱싱 "오류 (0) | 2019.11.12 |
---|---|
[SCALA] 왜 HDFS에 대한 한계는 2GB 스파크 RDD 파티션이 무엇입니까? (0) | 2019.11.12 |
[SCALA] 형의 클래스 인스턴스를 보장하기 위해 "부정적인"상황에 맞는 경계를 사용하면 범위에서 결석 (0) | 2019.11.12 |
[SCALA] 재생의 전류 루프의 인덱스를 가져 오기! 2 스칼라 템플릿 (0) | 2019.11.12 |
[SCALA] 병렬 스칼라 평행 모음 정도 (0) | 2019.11.12 |