복붙노트

[HADOOP] Hadoop 3.0에서 CopyMerge를 수행하는 방법은 무엇입니까?

HADOOP

Hadoop 3.0에서 CopyMerge를 수행하는 방법은 무엇입니까?

나는 hadoop 버전 2.7의 FileUtil이 여러 파일을 새로운 파일로 병합하는 copyMerge 기능을 가지고 있다는 것을 알고있다.

그러나 copyMerge 함수는 3.0 버전의 API에서 더 이상 지원되지 않습니다.

hadoop 3.0 버전에서 디렉토리 내의 모든 파일을 새로운 단일 파일로 병합하는 방법에 대한 아이디어가 있습니까?

해결법

  1. ==============================

    1.FileUtil # copyMerge 메서드가 제거되었습니다. 주요 변경 사항에 대한 자세한 내용보기 :

    FileUtil # copyMerge 메서드가 제거되었습니다. 주요 변경 사항에 대한 자세한 내용보기 :

    https://issues.apache.org/jira/browse/HADOOP-12967

    https://issues.apache.org/jira/browse/HADOOP-11392

    getmerge를 사용할 수 있습니다.

    사용법 : hadoop fs -getmerge [-nl]

    소스 디렉토리와 대상 파일을 입력 받아 src의 파일을 대상 로컬 파일에 연결합니다. 선택적으로 -nl을 설정하여 각 파일의 끝에 개행 문자 (LF)를 추가 할 수 있습니다. -skip-empty-file은 빈 파일의 경우 원하지 않는 개행 문자를 피할 수 있습니다.

    예 :

    hadoop fs -getmerge -nl /src /opt/output.txt
    hadoop fs -getmerge -nl /src/file1.txt /src/file2.txt /output.txt
    

    종료 코드 : 성공시 0을, 에러시 0이 아닌 값을 반환합니다.

    https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/FileSystemShell.html#getmerge

  2. ==============================

    2.나는 같은 질문을 가지고 있었고 copyMerge를 다시 구현해야했다. (하지만 PySpark에서는 원래의 copyMerge와 동일한 API 호출을 사용한다).

    나는 같은 질문을 가지고 있었고 copyMerge를 다시 구현해야했다. (하지만 PySpark에서는 원래의 copyMerge와 동일한 API 호출을 사용한다).

    Hadoop 3에 동등한 기능이없는 이유를 모릅니다. 우리는 HDFS 디렉토리의 파일을 HDFS 파일로 자주 병합해야합니다.

    위에서 언급 한 pySpark의 구현 방법은 다음과 같습니다. https://github.com/Tagar/stuff/blob/master/copyMerge.py

  3. ==============================

    3.FileUtil.copyMerge ()가 버전 3으로 시작하는 API에서 더 이상 사용되지 않고 제거되었으므로 간단한 해결책은 직접 다시 구현하는 것입니다.

    FileUtil.copyMerge ()가 버전 3으로 시작하는 API에서 더 이상 사용되지 않고 제거되었으므로 간단한 해결책은 직접 다시 구현하는 것입니다.

    다음은 이전 버전의 Java 원래 구현입니다.

    스칼라 재 작성은 다음과 같습니다.

    import scala.util.Try
    import org.apache.hadoop.conf.Configuration
    import org.apache.hadoop.fs.{FileSystem, Path}
    import org.apache.hadoop.io.IOUtils
    import java.io.IOException
    
    def copyMerge(
        srcFS: FileSystem, srcDir: Path,
        dstFS: FileSystem, dstFile: Path,
        deleteSource: Boolean, conf: Configuration
    ): Boolean = {
    
      if (dstFS.exists(dstFile))
        throw new IOException(s"Target $dstFile already exists")
    
      // Source path is expected to be a directory:
      if (srcFS.getFileStatus(srcDir).isDirectory()) {
    
        val outputFile = dstFS.create(dstFile)
        Try {
          srcFS
            .listStatus(srcDir)
            .sortBy(_.getPath.getName)
            .collect {
              case status if status.isFile() =>
                val inputFile = srcFS.open(status.getPath())
                Try(IOUtils.copyBytes(inputFile, outputFile, conf, false))
                inputFile.close()
            }
        }
        outputFile.close()
    
        if (deleteSource) srcFS.delete(srcDir, true) else true
      }
      else false
    }
    
  4. from https://stackoverflow.com/questions/42035735/how-to-do-copymerge-in-hadoop-3-0 by cc-by-sa and MIT license