복붙노트

[HADOOP] hdfs 파일 병합

HADOOP

hdfs 파일 병합

나는 HDFS에서 N_fileName.txt에 대한 1_fileName.txt의 이름 지정 규칙을 사용하여 1000 개 이상의 파일을 사용할 수 있습니다. 각 파일의 크기는 1024MB입니다. 파일의 순서를 유지하면서이 파일을 하나의 파일 (HDFS)에 병합해야합니다. 5_FileName.txt는 4_fileName.txt 이후에만 추가해야한다고 가정합니다.

이 작업을 수행하는 가장 빠르고 가장 빠른 방법은 무엇입니까?

데이터 노드간에 실제 데이터를 복사하지 않고이 병합을 수행 할 수있는 방법이 있습니까? e-g의 경우 :이 파일의 블록 위치를 가져오고이 블록 위치가있는 네임 노드에 새 항목 (FileName)을 만듭니다.

해결법

  1. ==============================

    1.이 작업을 효율적으로 수행 할 방법이 없으면 모든 데이터를 하나의 노드로 이동 한 다음 HDFS로 다시 이동해야합니다.

    이 작업을 효율적으로 수행 할 방법이 없으면 모든 데이터를 하나의 노드로 이동 한 다음 HDFS로 다시 이동해야합니다.

    이를 수행하는 명령 행 스크립틀릿은 다음과 같습니다.

    hadoop fs -text *_fileName.txt | hadoop fs -put - targetFilename.txt
    

    이것은 glob과 일치하는 모든 파일을 표준 출력에 전달한 다음 해당 스트림을 put 명령으로 파이프하고 targetFilename.txt라는 HDFS 파일에 스트림을 출력합니다

    당신이 가지고있는 유일한 문제는 파일명 구조입니다. 너비가 고정되어 있으면 더 쉽게 될 숫자 부분을 zeropadded하지만, 현재 상태에서는 예상치 못한 문단 순서 (1, 10, 100, 1000 , 11, 110, 등) 숫자 순서 (1,2,3,4, 등)보다. 스크립틀릿을 수정하여이 문제를 해결할 수 있습니다.

    hadoop fs -text [0-9]_fileName.txt [0-9][0-9]_fileName.txt \
        [0-9][0-9[0-9]_fileName.txt | hadoop fs -put - targetFilename.txt
    
  2. ==============================

    2.이 작업을 수행하는 org.apache.hadoop.fs.FileUtil.copyMerge API 메소드가 있습니다.

    이 작업을 수행하는 org.apache.hadoop.fs.FileUtil.copyMerge API 메소드가 있습니다.

    public static boolean copyMerge(
                        FileSystem srcFS,
                        Path srcDir,
                        FileSystem dstFS,
                        Path dstFile,
                        boolean deleteSource,
                        Configuration conf,
                        String addString)
    

    srcDir의 모든 파일을 사전 순으로 읽고 dstFile에 내용을 추가합니다.

  3. ==============================

    3.스파크를 사용할 수 있다면. 그것은 마치 할 수있다.

    스파크를 사용할 수 있다면. 그것은 마치 할 수있다.

    sc.textFile("hdfs://...../part*).coalesce(1).saveAsTextFile("hdfs://...../filename)
    

    Spark이 분산 된 방식으로 작동하므로이 노드가 제대로 작동하는지 확인하십시오. 단지주의가 필요 하긴하지만, 파일이 매우 큰 경우 파일을 스파크로 병합하는 것이 느려질 수 있습니다.

  4. ==============================

    4.파일 순서가 중요하고 사전적인 순서가 목적을 수행하지 못하기 때문에이 태스크에 대한 매퍼 (mapper) 프로그램을 작성하는 것이 좋은 후보로 보입니다.이 태스크는 아마도 주기적으로 실행될 수 있습니다. HDFS 맵 작업으로 작성하는 것이 비용 절감 효과가 없다는 장점이 있습니다. 데이터 노드간에 데이터를 많이 이동시키지 않고 이러한 파일을 하나의 출력 파일로 병합 할 수 있기 때문에 효율적입니다. 소스 파일은 HDFS에 있고 매퍼 작업은 데이터 유사성을 시도하므로 서로 다른 데이터 노드에서 파일을 이동하지 않고 파일을 병합 할 수 있습니다.

    파일 순서가 중요하고 사전적인 순서가 목적을 수행하지 못하기 때문에이 태스크에 대한 매퍼 (mapper) 프로그램을 작성하는 것이 좋은 후보로 보입니다.이 태스크는 아마도 주기적으로 실행될 수 있습니다. HDFS 맵 작업으로 작성하는 것이 비용 절감 효과가 없다는 장점이 있습니다. 데이터 노드간에 데이터를 많이 이동시키지 않고 이러한 파일을 하나의 출력 파일로 병합 할 수 있기 때문에 효율적입니다. 소스 파일은 HDFS에 있고 매퍼 작업은 데이터 유사성을 시도하므로 서로 다른 데이터 노드에서 파일을 이동하지 않고 파일을 병합 할 수 있습니다.

    mapper 프로그램은 사용자 정의 InputSplit (입력 디렉토리에 파일 이름을 가져 와서 필요한대로 정렬)과 사용자 정의 InputFormat을 필요로합니다.

    mapper는 hdfs append 또는 byte []로 쓸 수있는 raw 출력 스트림을 사용할 수 있습니다.

    내가 생각하고있는 Mapper 프로그램의 대략적인 스케치는 다음과 같습니다.

    public class MergeOrderedFileMapper extends MapReduceBase implements Mapper<ArrayWritable, Text, ??, ??> 
    {
        FileSystem fs;
    
        public void map(ArrayWritable sourceFiles, Text destFile, OutputCollector<??, ??> output, Reporter reporter) throws IOException 
        {
    
            //Convert the destFile to Path.
            ...
            //make sure the parent directory of destFile is created first.
            FSDataOutputStream destOS = fs.append(destFilePath);
            //Convert the sourceFiles to Paths.
            List<Path> srcPaths;
            ....
            ....
                for(Path p: sourcePaths) {
    
                    FSDataInputStream srcIS = fs.open(p);
                    byte[] fileContent
                    srcIS.read(fileContent);
                    destOS.write(fileContent);
                    srcIS.close();
                    reporter.progress();  // Important, else mapper taks may timeout.
                }
                destOS.close();
    
    
            // Delete source files.
    
            for(Path p: sourcePaths) {
                fs.delete(p, false);
                reporter.progress();
            }
    
        }
    }
    
  5. ==============================

    5.우리가 PySpark를 자주 사용함에 따라 PySpark 구현을 작성했습니다.

    우리가 PySpark를 자주 사용함에 따라 PySpark 구현을 작성했습니다.

    Hadoop의 copyMerge ()를 모델로하고 동일한 하위 레벨 Hadoop API를 사용하여이를 수행합니다.

    https://github.com/Tagar/abalon/blob/v2.3.3/abalon/spark/sparkutils.py#L335

    파일 이름의 사전 순을 유지합니다.

  6. from https://stackoverflow.com/questions/14831117/merging-hdfs-files by cc-by-sa and MIT license