[HADOOP] Hadoop hdfs 디렉토리에있는 모든 Gzip 파일의 압축을 풉니 다.
HADOOPHadoop hdfs 디렉토리에있는 모든 Gzip 파일의 압축을 풉니 다.
내 HDFS에서 일반적인 형식으로 압축을 풀고 싶은 gzip 파일이 있습니다. 이 일을위한 API가 있습니까? 아니면 어떻게이 함수를 작성할 수 있을까요?
나는 어떤 명령 행 도구도 사용하고 싶지 않다. 대신 Java 코드를 작성하여이 작업을 수행하고 싶습니다.
해결법
-
==============================
1.파일의 압축을 풀려면 CompressionCodec이 필요합니다. gzip 구현은 GzipCodec입니다. 코덱을 통해 CompressedInputStream을 얻고 간단한 IO로 결과를 출력합니다. 이 같은 것 : 파일 file.gz가 있다고 가정 해보십시오.
파일의 압축을 풀려면 CompressionCodec이 필요합니다. gzip 구현은 GzipCodec입니다. 코덱을 통해 CompressedInputStream을 얻고 간단한 IO로 결과를 출력합니다. 이 같은 것 : 파일 file.gz가 있다고 가정 해보십시오.
//path of file String uri = "/uri/to/file.gz"; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(uri), conf); Path inputPath = new Path(uri); CompressionCodecFactory factory = new CompressionCodecFactory(conf); // the correct codec will be discovered by the extension of the file CompressionCodec codec = factory.getCodec(inputPath); if (codec == null) { System.err.println("No codec found for " + uri); System.exit(1); } // remove the .gz extension String outputUri = CompressionCodecFactory.removeSuffix(uri, codec.getDefaultExtension()); InputStream is = codec.createInputStream(fs.open(inputPath)); OutputStream out = fs.create(new Path(outputUri)); IOUtils.copyBytes(is, out, conf); // close streams
최신 정보
디렉토리에있는 모든 파일을 가져와야하는 경우 FileStatuses를
FileSystem fs = FileSystem.get(new Configuration()); FileStatus[] statuses = fs.listStatus(new Path("hdfs/path/to/dir"));
그런 다음 루프하기 만하면됩니다.
for (FileStatus status: statuses) { CompressionCodec codec = factory.getCodec(status.getPath()); ... InputStream is = codec.createInputStream(fs.open(status.getPath()); ... }
-
==============================
2.나는 압축을 변경하기 위해 Scalding에서 썼던 identity map Hadoop job을 사용한다. / split size / 등을 바꾼다.
나는 압축을 변경하기 위해 Scalding에서 썼던 identity map Hadoop job을 사용한다. / split size / 등을 바꾼다.
class IdentityMap(args: Args) extends ConfiguredJob(args) { CombineFileMultipleTextLine(args.list("in"): _*).read.mapTo[String, String]('line -> 'line)(identity) .write(if (args.boolean("compress")) TsvCompressed(args("out")) else TextLine(args("out"))) }
일반 구성 추상 클래스 :
abstract class ConfiguredJob(args: Args) extends Job(args) { override def config(implicit mode: Mode): Map[AnyRef, AnyRef] = { val Megabyte = 1024 * 1024 val conf = super.config(mode) val splitSizeMax = args.getOrElse("splitSizeMax", "1024").toInt * Megabyte val splitSizeMin = args.getOrElse("splitSizeMin", "512").toInt * Megabyte val jobPriority = args.getOrElse("jobPriority","NORMAL") val maxHeap = args.getOrElse("maxHeap","512m") conf ++ Map("mapred.child.java.opts" -> ("-Xmx" + maxHeap), "mapred.output.compress" -> (if (args.boolean("compress")) "true" else "false"), "mapred.min.split.size" -> splitSizeMin.toString, "mapred.max.split.size" -> splitSizeMax.toString, // "mapred.output.compression.codec" -> args.getOrElse("codec", "org.apache.hadoop.io.compress.BZip2Codec"), //Does not work, has to be -D flag "mapred.job.priority" -> jobPriority) } }
from https://stackoverflow.com/questions/24501402/decompress-all-gzip-files-in-a-hadoop-hdfs-directory by cc-by-sa and MIT license
'HADOOP' 카테고리의 다른 글
[HADOOP] 람다 아키텍처 - 왜 배치 레이어 (0) | 2019.07.30 |
---|---|
[HADOOP] getCacheFiles ()와 getLocalCacheFiles ()는 같은 것입니까? (0) | 2019.07.30 |
[HADOOP] hadoop aws 버전 호환성 (0) | 2019.07.30 |
[HADOOP] 작은 파일과 128Mb 블록 크기의 HDFS 동작 (0) | 2019.07.30 |
[HADOOP] 지도 파일을 MapReduce 작업의 입력으로 사용 (0) | 2019.07.30 |