복붙노트

[HADOOP] Hadoop hdfs 디렉토리에있는 모든 Gzip 파일의 압축을 풉니 다.

HADOOP

Hadoop hdfs 디렉토리에있는 모든 Gzip 파일의 압축을 풉니 다.

내 HDFS에서 일반적인 형식으로 압축을 풀고 싶은 gzip 파일이 있습니다. 이 일을위한 API가 있습니까? 아니면 어떻게이 함수를 작성할 수 있을까요?

나는 어떤 명령 행 도구도 사용하고 싶지 않다. 대신 Java 코드를 작성하여이 작업을 수행하고 싶습니다.

해결법

  1. ==============================

    1.파일의 압축을 풀려면 CompressionCodec이 필요합니다. gzip 구현은 GzipCodec입니다. 코덱을 통해 CompressedInputStream을 얻고 간단한 IO로 결과를 출력합니다. 이 같은 것 : 파일 file.gz가 있다고 가정 해보십시오.

    파일의 압축을 풀려면 CompressionCodec이 필요합니다. gzip 구현은 GzipCodec입니다. 코덱을 통해 CompressedInputStream을 얻고 간단한 IO로 결과를 출력합니다. 이 같은 것 : 파일 file.gz가 있다고 가정 해보십시오.

    //path of file
    String uri = "/uri/to/file.gz";
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(URI.create(uri), conf);
    Path inputPath = new Path(uri);
    
    CompressionCodecFactory factory = new CompressionCodecFactory(conf);
    // the correct codec will be discovered by the extension of the file
    CompressionCodec codec = factory.getCodec(inputPath);
    
    if (codec == null) {
        System.err.println("No codec found for " + uri);
        System.exit(1);
    }
    
    // remove the .gz extension
    String outputUri =
        CompressionCodecFactory.removeSuffix(uri, codec.getDefaultExtension());
    
    InputStream is = codec.createInputStream(fs.open(inputPath));
    OutputStream out = fs.create(new Path(outputUri));
    IOUtils.copyBytes(is, out, conf);
    
    // close streams
    

    최신 정보

    디렉토리에있는 모든 파일을 가져와야하는 경우 FileStatuses를

    FileSystem fs = FileSystem.get(new Configuration());
    FileStatus[] statuses = fs.listStatus(new Path("hdfs/path/to/dir"));
    

    그런 다음 루프하기 만하면됩니다.

    for (FileStatus status: statuses) {
        CompressionCodec codec = factory.getCodec(status.getPath());
        ...
        InputStream is = codec.createInputStream(fs.open(status.getPath());
        ...
    }
    
  2. ==============================

    2.나는 압축을 변경하기 위해 Scalding에서 썼던 identity map Hadoop job을 사용한다. / split size / 등을 바꾼다.

    나는 압축을 변경하기 위해 Scalding에서 썼던 identity map Hadoop job을 사용한다. / split size / 등을 바꾼다.

    class IdentityMap(args: Args) extends ConfiguredJob(args) {
      CombineFileMultipleTextLine(args.list("in"): _*).read.mapTo[String, String]('line -> 'line)(identity)
      .write(if (args.boolean("compress")) TsvCompressed(args("out")) else TextLine(args("out")))
    }
    

    일반 구성 추상 클래스 :

    abstract class ConfiguredJob(args: Args) extends Job(args) {
      override def config(implicit mode: Mode): Map[AnyRef, AnyRef] = {
        val Megabyte = 1024 * 1024
        val conf = super.config(mode)
        val splitSizeMax = args.getOrElse("splitSizeMax", "1024").toInt * Megabyte
        val splitSizeMin = args.getOrElse("splitSizeMin", "512").toInt * Megabyte
        val jobPriority = args.getOrElse("jobPriority","NORMAL")
        val maxHeap = args.getOrElse("maxHeap","512m")
        conf ++ Map("mapred.child.java.opts" -> ("-Xmx" + maxHeap),
          "mapred.output.compress" -> (if (args.boolean("compress")) "true" else "false"),
          "mapred.min.split.size" -> splitSizeMin.toString,
          "mapred.max.split.size" -> splitSizeMax.toString,
    //      "mapred.output.compression.codec" -> args.getOrElse("codec", "org.apache.hadoop.io.compress.BZip2Codec"), //Does not work, has to be -D flag
          "mapred.job.priority" -> jobPriority)
      }
    }
    
  3. from https://stackoverflow.com/questions/24501402/decompress-all-gzip-files-in-a-hadoop-hdfs-directory by cc-by-sa and MIT license