[HADOOP] 반복자와 함께 mapPartition을 사용하여 스파크 RDD 저장
HADOOP반복자와 함께 mapPartition을 사용하여 스파크 RDD 저장
HDFS 및 로컬에도 저장 해야하는 중간 데이터가 있습니다. Spark 1.6을 사용하고 있습니다. 중간 형태의 HDFS에서는 / output / testDummy / part-00000 및 / output / testDummy / part-00001에 데이터가 있습니다. Java / Scala를 사용하여 이러한 파티션을 로컬에 저장하여 /users/home/indexes/index.nt (로컬로 병합하여) 또는 /users/home/indexes/index-0000.nt 및 /home/indexes/index-0001.nt는 별도로 제공됩니다.
내 코드는 다음과 같습니다. 참고 : testDummy는 test와 동일하며 출력에는 두 개의 파티션이 있습니다. 별도로 저장하거나 index.nt 파일과 로컬로 저장하고 싶습니다. 두 개의 데이터 노드에 별도로 저장하는 것을 선호합니다. YARN에서 클러스터를 사용하고 스파크 작업을 제출하고 있습니다. 또한 몇 번의 의견과 어떤 데이터를 얻고 있는지에 대한 의견도 추가했습니다. 어떻게해야합니까? 도움을 주시면 감사하겠습니다.
val testDummy = outputFlatMapTuples.coalesce(Constants.INITIAL_PARTITIONS).saveAsTextFile(outputFilePathForHDFS+"/testDummy")
println("testDummy done") //1 time print
def savesData(iterator: Iterator[(String)]): Iterator[(String)] = {
println("Inside savesData") // now 4 times when coalesce(Constants.INITIAL_PARTITIONS)=2
println("iter size"+iterator.size) // 2 735 2 735 values
val filenamesWithExtension = outputPath + "/index.nt"
println("filenamesWithExtension "+filenamesWithExtension.length) //4 times
var list = List[(String)]()
val fileWritter = new FileWriter(filenamesWithExtension,true)
val bufferWritter = new BufferedWriter(fileWritter)
while (iterator.hasNext){ //iterator.hasNext is false
println("inside iterator") //0 times
val dat = iterator.next()
println("datadata "+iterator.next())
bufferWritter.write(dat + "\n")
bufferWritter.flush()
println("index files written")
val dataElements = dat.split(" ")
println("dataElements") //0
list = list.::(dataElements(0))
list = list.::(dataElements(1))
list = list.::(dataElements(2))
}
bufferWritter.close() //closing
println("savesData method end") //4 times when coal=2
list.iterator
}
println("before saving data into local") //1
val test = outputFlatMapTuples.coalesce(Constants.INITIAL_PARTITIONS).mapPartitions(savesData)
println("testRDD partitions "+test.getNumPartitions) //2
println("testRDD size "+test.collect().length) //0
println("after saving data into local") //1
추신 : 나는 이것과 이것을 찾았지만 찾고있는 것과 정확히 같지 않았습니다. 어쨌든 index.nt에 아무것도 얻지 못했습니다.
해결법
-
==============================
1.몇 가지 :
몇 가지 :
-
==============================
2.Java 7은 디렉토리를 감시하는 수단을 제공합니다.
Java 7은 디렉토리를 감시하는 수단을 제공합니다.
https://docs.oracle.com/javase/tutorial/essential/io/notification.html
아이디어는 감시 서비스를 작성하고, 관심있는 디렉토리에 등록 (파일 작성, 삭제 등과 같은 관심있는 이벤트 언급), 감시하는 것입니다. 작성, 삭제, 등 원하는 조치를 취할 수 있습니다.
해당되는 경우 Java hdfs API에 크게 의존해야합니다.
이벤트가 영원히 대기하기 때문에 백그라운드에서 프로그램을 실행하십시오. (원하는대로 종료 한 후 로직을 작성할 수 있습니다)
반면에 쉘 스크립팅도 도움이 될 것입니다.
파일을 읽는 동안 hdfs 파일 시스템의 일관성 모델을 알고 있어야합니다.
희망이 도움이되기를 바랍니다.
from https://stackoverflow.com/questions/38044231/save-a-spark-rdd-using-mappartition-with-iterator by cc-by-sa and MIT license
'HADOOP' 카테고리의 다른 글
[HADOOP] Spark Scala를 사용하여 HiveContext를 사용하여 Hive 테이블에 데이터 삽입 (0) | 2019.08.08 |
---|---|
[HADOOP] sqoop 가져 오기 또는 내보내기 중 잘못된 레코드 처리 (0) | 2019.08.08 |
[HADOOP] 디렉토리에 하위 디렉토리가 있습니까? (0) | 2019.08.08 |
[HADOOP] HDFS 쓰기 결과 "CreateSymbolicLink 오류 (1314) : 클라이언트가 필요한 권한을 가지고 있지 않습니다." (0) | 2019.08.08 |
[HADOOP] hadoop 2.0을 위해 mahout을 어떻게 컴파일 / 사용할 수 있습니까? (0) | 2019.08.08 |