복붙노트

[HADOOP] 반복자와 함께 mapPartition을 사용하여 스파크 RDD 저장

HADOOP

반복자와 함께 mapPartition을 사용하여 스파크 RDD 저장

HDFS 및 로컬에도 저장 해야하는 중간 데이터가 있습니다. Spark 1.6을 사용하고 있습니다. 중간 형태의 HDFS에서는 / output / testDummy / part-00000 및 / output / testDummy / part-00001에 데이터가 있습니다. Java / Scala를 사용하여 이러한 파티션을 로컬에 저장하여 /users/home/indexes/index.nt (로컬로 병합하여) 또는 /users/home/indexes/index-0000.nt 및 /home/indexes/index-0001.nt는 별도로 제공됩니다.

내 코드는 다음과 같습니다. 참고 : testDummy는 test와 동일하며 출력에는 두 개의 파티션이 있습니다. 별도로 저장하거나 index.nt 파일과 로컬로 저장하고 싶습니다. 두 개의 데이터 노드에 별도로 저장하는 것을 선호합니다. YARN에서 클러스터를 사용하고 스파크 작업을 제출하고 있습니다. 또한 몇 번의 의견과 어떤 데이터를 얻고 있는지에 대한 의견도 추가했습니다. 어떻게해야합니까? 도움을 주시면 감사하겠습니다.

 val testDummy = outputFlatMapTuples.coalesce(Constants.INITIAL_PARTITIONS).saveAsTextFile(outputFilePathForHDFS+"/testDummy")
 println("testDummy done")   //1 time print

def savesData(iterator: Iterator[(String)]): Iterator[(String)] = {
    println("Inside savesData")                                 //  now 4 times when coalesce(Constants.INITIAL_PARTITIONS)=2
    println("iter size"+iterator.size)                           //  2 735 2 735 values
    val filenamesWithExtension = outputPath + "/index.nt"
    println("filenamesWithExtension "+filenamesWithExtension.length)   //4 times
    var list = List[(String)]()

    val fileWritter = new FileWriter(filenamesWithExtension,true)
    val bufferWritter = new BufferedWriter(fileWritter)

     while (iterator.hasNext){                       //iterator.hasNext is false
       println("inside iterator")                    //0 times 
       val dat = iterator.next()
       println("datadata "+iterator.next())

       bufferWritter.write(dat + "\n")
       bufferWritter.flush()
       println("index files written")

       val dataElements = dat.split(" ")
       println("dataElements")                                    //0
       list = list.::(dataElements(0))
       list = list.::(dataElements(1))
       list = list.::(dataElements(2))
     }
    bufferWritter.close() //closing
    println("savesData method end")                         //4 times when coal=2
    list.iterator
}

println("before saving data into local")                              //1
val test = outputFlatMapTuples.coalesce(Constants.INITIAL_PARTITIONS).mapPartitions(savesData)
println("testRDD partitions "+test.getNumPartitions)                               //2
println("testRDD size "+test.collect().length)                                //0
println("after saving data into local")   //1

추신 : 나는 이것과 이것을 찾았지만 찾고있는 것과 정확히 같지 않았습니다. 어쨌든 index.nt에 아무것도 얻지 못했습니다.

해결법

  1. ==============================

    1.몇 가지 :

    몇 가지 :

  2. ==============================

    2.Java 7은 디렉토리를 감시하는 수단을 제공합니다.

    Java 7은 디렉토리를 감시하는 수단을 제공합니다.

    https://docs.oracle.com/javase/tutorial/essential/io/notification.html

    아이디어는 감시 서비스를 작성하고, 관심있는 디렉토리에 등록 (파일 작성, 삭제 등과 같은 관심있는 이벤트 언급), 감시하는 것입니다. 작성, 삭제, 등 원하는 조치를 취할 수 있습니다.

    해당되는 경우 Java hdfs API에 크게 의존해야합니다.

    이벤트가 영원히 대기하기 때문에 백그라운드에서 프로그램을 실행하십시오. (원하는대로 종료 한 후 로직을 작성할 수 있습니다)

    반면에 쉘 스크립팅도 도움이 될 것입니다.

    파일을 읽는 동안 hdfs 파일 시스템의 일관성 모델을 알고 있어야합니다.

    희망이 도움이되기를 바랍니다.

  3. from https://stackoverflow.com/questions/38044231/save-a-spark-rdd-using-mappartition-with-iterator by cc-by-sa and MIT license