복붙노트

[SCALA] 기능 스파크에 빈 목록을 반환

SCALA

기능 스파크에 빈 목록을 반환

다음은 압축 파일에 파일 이름의 목록을 가져 오는 코드는

def getListOfFilesInRepo(zipFileRDD : RDD[(String,PortableDataStream)]) : (List[String]) = {
    val zipInputStream = zipFileRDD.values.map(x => new ZipInputStream(x.open))
    val filesInZip =  new ArrayBuffer[String]()
    var ze : Option[ZipEntry] = None
    zipInputStream.foreach(stream =>{
      do{
        ze = Option(stream.getNextEntry);
        ze.foreach{ze =>
          if(ze.getName.endsWith("java") && !ze.isDirectory()){
            var fileName:String = ze.getName.substring(ze.getName.lastIndexOf("/")+1,ze.getName.indexOf(".java"))
            filesInZip += fileName
          }
        }
        stream.closeEntry()
      } while(ze.isDefined)
      println(filesInZip.toList.length) // print 889 (correct)
    })
    println(filesInZip.toList.length) // print 0 (WHY..?)
    (filesInZip.toList)
  }

나는 다음과 같은 방법으로 위의 코드를 실행합니다 :

scala> val zipFileRDD = sc.binaryFiles("./handsOn/repo~apache~storm~14135470~false~Java~master~2210.zip")
zipFileRDD: org.apache.spark.rdd.RDD[(String, org.apache.spark.input.PortableDataStream)] = ./handsOn/repo~apache~storm~14135470~false~Java~master~2210.zip BinaryFileRDD[17] at binaryFiles at <console>:25

scala> getListOfFilesInRepo(zipRDD)
889
0
res12: List[String] = List()

왜 내가 889을 받고 있지 않다 대신 0을 받고?

해결법

  1. ==============================

    1.filesInZip 노동자 사이에 공유되지 않기 때문에이 문제가 발생합니다. foreach는 filesInZip은 로컬 복사본상에서 동작하고 완료 될 때,이 사본은 단순히 폐기되고 가비지 수집된다. 당신이 결과를 유지하려면 당신은 변환 (가장 가능성이 flatMap)를 사용하여 수집 집계 값을 반환해야합니다.

    filesInZip 노동자 사이에 공유되지 않기 때문에이 문제가 발생합니다. foreach는 filesInZip은 로컬 복사본상에서 동작하고 완료 될 때,이 사본은 단순히 폐기되고 가비지 수집된다. 당신이 결과를 유지하려면 당신은 변환 (가장 가능성이 flatMap)를 사용하여 수집 집계 값을 반환해야합니다.

    def listFiles(stream: PortableDataStream): TraversableOnce[String] = ???
    
    zipInputStream.flatMap(listFiles)
    

    당신은 이해 폐쇄에서 더 많은 것을 배울 수

  2. from https://stackoverflow.com/questions/34178718/function-returns-an-empty-list-in-spark by cc-by-sa and MIT license