복붙노트

[HADOOP] 어떻게 Spark Scala 쉘 내의 HDFS 위치에있는 모든 csv 파일을 나열 할 수 있습니까?

HADOOP

어떻게 Spark Scala 쉘 내의 HDFS 위치에있는 모든 csv 파일을 나열 할 수 있습니까?

이 목적은 HDFS의 두 번째 위치에서 각 데이터 파일의 사본을 조작하고 저장하기위한 것입니다. 나는 사용할 것이다.

RddName.coalesce(1).saveAsTextFile(pathName)

결과를 HDFS에 저장합니다.

이것은 성능이 효율적이지 않을 것이라고 확신하지만 각각의 파일을 개별적으로 수행하고자하는 이유입니다. 그러나 CSV 파일 경로 목록을 문자열 배열에 저장 한 다음 각 RDD를 반복하여 루프하는 방법을 아직 결정하지 않았습니다.

HDFS 소스 위치로 다음과 같은 익명의 예를 사용합니다.

/data/email/click/date=2015-01-01/sent_20150101.csv
/data/email/click/date=2015-01-02/sent_20150102.csv
/data/email/click/date=2015-01-03/sent_20150103.csv

Hadoop FS Shell을 사용하여 파일 경로를 나열하는 방법을 알고 있습니다.

HDFS DFS -ls /data/email/click/*/*.csv

모든 데이터에 대해 하나의 RDD를 만드는 방법을 알고 있습니다.

val sentRdd = sc.textFile( "/data/email/click/*/*.csv" )

해결법

  1. ==============================

    1.나는 그것을 철저히 테스트하지는 않았지만 이런 식으로 작동하는 것처럼 보인다 :

    나는 그것을 철저히 테스트하지는 않았지만 이런 식으로 작동하는 것처럼 보인다 :

    import org.apache.spark.deploy.SparkHadoopUtil
    import org.apache.hadoop.fs.{FileSystem, Path, LocatedFileStatus, RemoteIterator}
    import java.net.URI
    
    val path: String = ???
    
    val hconf = SparkHadoopUtil.get.newConfiguration(sc.getConf)
    val hdfs = FileSystem.get(hconf)
    val iter = hdfs.listFiles(new Path(path), false)
    
    def listFiles(iter: RemoteIterator[LocatedFileStatus]) = {
      def go(iter: RemoteIterator[LocatedFileStatus], acc: List[URI]): List[URI] = {
        if (iter.hasNext) {
          val uri = iter.next.getPath.toUri
          go(iter, uri :: acc)
        } else {
          acc
        }
      }
      go(iter, List.empty[java.net.URI])
    }
    
    listFiles(iter).filter(_.toString.endsWith(".csv"))
    
  2. ==============================

    2.이것은 궁극적으로 나를 위해 일한 것입니다 :

    이것은 궁극적으로 나를 위해 일한 것입니다 :

    import org.apache.hadoop.fs._
    import org.apache.spark.deploy.SparkHadoopUtil
    import java.net.URI
    
    val hdfs_conf = SparkHadoopUtil.get.newConfiguration(sc.getConf)
    val hdfs = FileSystem.get(hdfs_conf)
    // source data in HDFS
    val sourcePath = new Path("/<source_location>/<filename_pattern>")
    
    hdfs.globStatus( sourcePath ).foreach{ fileStatus =>
       val filePathName = fileStatus.getPath().toString()
       val fileName = fileStatus.getPath().getName()
    
       // < DO STUFF HERE>
    
    } // end foreach loop
    
  3. ==============================

    3.sc.wholeTextFiles (path)가 도움이됩니다. 그것은 (파일 경로, filecontent)의 rdd를 제공합니다.

    sc.wholeTextFiles (path)가 도움이됩니다. 그것은 (파일 경로, filecontent)의 rdd를 제공합니다.

  4. from https://stackoverflow.com/questions/32771089/how-can-one-list-all-csv-files-in-an-hdfs-location-within-the-spark-scala-shell by cc-by-sa and MIT license