복붙노트

[HADOOP] spark java api에서 재귀 적으로 HDFS에서 모든 파일 읽기

HADOOP

spark java api에서 재귀 적으로 HDFS에서 모든 파일 읽기

나는 spark를 사용하여 디렉토리에서 하나의 RDD로 HDFS의 모든 파일의 데이터를 읽으며 하위 디렉토리도 사용합니다. 나는 그렇게 할 수있는 효율적인 방법을 찾지 못했습니다. 그래서 아래와 같이 사용자 정의 된 코드를 작성하려고했습니다.

public Object fetch(String source,String sink) {

    //reading data
    boolean isDir=new File(source).isDirectory();
    System.out.println("isDir="+isDir);
    JavaRDD<String> lines;
    if(isDir)
    {

        lines=readFiles(new File(source).listFiles(), null);
    }
    else
        lines= sc.textFile(source);

    lines.saveAsTextFile(sink);
    return true;
}

public static JavaRDD<String> readFiles(File[] files,JavaRDD<String> lines) {
    for (File file : files) {
        if (file.isDirectory()) {
            readFiles(file.listFiles(),lines); // Calls same method again.
        } 
        else {
            if(lines==null)
                lines=sc.textFile(file.getPath());
            else
            {
                JavaRDD<String> r=sc.textFile(file.getPath());
                lines.union(r);
            }
        }
    }
    return lines;
}

하지만 isDir은 그것이 디렉토리가 아니라는 것을 거짓으로 포함하고 있기 때문에 이것은 예상 한 작업을 수행하지 않습니다. 뭐가 잘못 됐는지 안내해 주실 수 있습니까? 이 일을하는 효율적인 방법이 있습니까? 고마워요.

해결법

  1. ==============================

    1.Spark은 Hadoop 작업 구성을 기반으로 데이터를 읽을 수 있으므로 FileInputFormat # setInputDirRecursive 메소드를 사용할 수 있습니다.

    Spark은 Hadoop 작업 구성을 기반으로 데이터를 읽을 수 있으므로 FileInputFormat # setInputDirRecursive 메소드를 사용할 수 있습니다.

    JavaSparkContext context = new JavaSparkContext();
    
    Job job;
    
    try {
      job = Job.getInstance();
      FileInputFormat.setInputPaths(job, new Path("/path/to/input/directory));
      FileInputFormat.setInputDirRecursive(job, true);
    } catch (IOException e1) {
      e1.printStackTrace();
      System.exit(1);
    }
    
    JavaRDD<Text> sourceData = context.newAPIHadoopRDD(job.getConfiguration(), TextInputFormat.class, LongWritable.class, Text.class)
      .values();
    

    분명히 String 대신 Text 데이터 유형을 사용하게 될 것입니다.

  2. ==============================

    2."*"문자는 폴더를 재귀 적으로 읽습니다.

    "*"문자는 폴더를 재귀 적으로 읽습니다.

    JavaSparkContext sc = new JavaSparkContext(conf);
    sc.textFile("/my/directory/*");
    

    자세한 정보는 다음 링크를 참조하십시오 :

    http://spark.apache.org/docs/latest/programming-guide.html#external-datasets

  3. ==============================

    3.그래서 마침내 나는 해결책을 찾았다. 나는 실수로 로컬 파일 시스템에서 파일을 읽는 데 사용되는 File 객체를 사용하고있었습니다. HDFS를 읽고 쓰려면 org.apache.hadoop.fs를 사용해야합니다. *

    그래서 마침내 나는 해결책을 찾았다. 나는 실수로 로컬 파일 시스템에서 파일을 읽는 데 사용되는 File 객체를 사용하고있었습니다. HDFS를 읽고 쓰려면 org.apache.hadoop.fs를 사용해야합니다. *

    그래서 여기에 해결책이있다.

    public Object fetch(String source,String sink) {
    
        //reading data
        Path src=new Path(source);
        try {
            if(fs.exists(src))
            {
                FileStatus[] lists=fs.listStatus(src);
                readFiles(lists);
            }
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        return true;
    }
    
    public void readFiles(FileStatus[] files) {
            for(int i=0;i<files.length;i++)
            {
                if(files[i].isDirectory())
                {
                    try {
                        readFiles(fs.listStatus(files[i].getPath()));
                    } catch (IOException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
                else
                {
                        if(lines==null)
                        {
                           Path p=files[i].getPath();
                           JavaRDD<String> lines=sc.textFile(p.toString());
                        }
                        else
                       {
                          JavaRDD<String> r=sc.textFile(file.getPath());
                          lines.union(r);
                       }
            }
      }
      return lines;
     }
    
  4. from https://stackoverflow.com/questions/25782925/reading-all-files-from-hdfs-recursively-in-spark-java-api by cc-by-sa and MIT license