복붙노트

[HADOOP] Spark를 사용하여 디렉토리에서 여러 파일 읽기

HADOOP

Spark를 사용하여 디렉토리에서 여러 파일 읽기

스파크를 사용하여 kaggle에서이 문제를 해결하기 위해 노력하고 있습니다.

입력의 계층 구조는 다음과 같습니다.

drivers/{driver_id}/trip#.csv
e.g., drivers/1/1.csv
      drivers/1/2.csv
      drivers/2/1.csv

나는 부모 디렉토리 "드라이버"를 읽고 각각의 하위 디렉토리에 대해 파일의 내용으로 (sub_directory, file_name) 및 값으로 key와 pairRDD를 만들고 싶습니다.

이 링크를 확인하고 사용하려고했습니다.

val text = sc.wholeTextFiles("drivers")
text.collect()

오류로 인해 실패했습니다 :

java.lang.ArrayIndexOutOfBoundsException: 0
    at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat$OneFileInfo.<init>(CombineFileInputFormat.java:591)
    at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getMoreSplits(CombineFileInputFormat.java:283)
    at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:243)
    at org.apache.spark.rdd.WholeTextFileRDD.getPartitions(NewHadoopRDD.scala:267)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1779)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:885)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:884)

하지만 아래 코드를 실행하면 작동합니다.

val text =  sc.wholeTextFiles("drivers/1")
text.collect()

하지만 여기서는 디렉토리 드라이버를 읽고 파일을 반복하고 각 항목에 대해 wholeTextFiles를 호출해야하므로이 작업을 수행하고 싶지 않습니다.

해결법

  1. ==============================

    1.사용하는 대신

    사용하는 대신

    sc.textfile("path/*/**") or sc.wholeTextFiles("path/*")
    

    이 코드를 사용할 수 있습니다. spark는 내부적으로 폴더와 하위 폴더의 가능한 모든 값을 나열하므로 대용량 데이터 세트에 시간을 낭비 할 수 있습니다. 그 대신 동일한 목적으로 Unions를 사용할 수 있습니다.

    위치를 포함하는이 List 객체를 다음 코드 조각에 전달합니다. sc는 SQLContext의 객체입니다.

    var df: DataFrame = null;
      for (file <- files) {
        val fileDf= sc.textFile(file)
        if (df!= null) {
          df= df.unionAll(fileDf)
        } else {
          df= fileDf
        }
      }
    

    이제 최종 통합 RDD, 즉 df

  2. from https://stackoverflow.com/questions/31051107/read-multiple-files-from-a-directory-using-spark by cc-by-sa and MIT license