복붙노트

[HADOOP] 스파크를 통해 .zip 파일을 열거 나 스트리밍하는 법?

HADOOP

스파크를 통해 .zip 파일을 열거 나 스트리밍하는 법?

Spark를 통해 'Zip'파일을 열고 싶습니다. Hadoops 네이티브 코덱 지원으로 .gzip 파일을 열 수는 있지만 .zip 파일로는 그렇게 할 수 없습니다.

Spark 코드에서 zip 파일을 읽을 수있는 쉬운 방법이 있습니까? 또한 CompressionCodecFactory에 추가 할 zip 코덱 구현을 검색했지만 지금까지 실패했습니다.

해결법

  1. ==============================

    1.아래 코드를 시도하십시오.

    아래 코드를 시도하십시오.

    using API sparkContext.newAPIHadoopRDD(
        hadoopConf,
        InputFormat.class,
        ImmutableBytesWritable.class, Result.class)
    
  2. ==============================

    2.파이썬 코드가있는 솔루션이 없었고 최근에 pyspark에서 zip을 읽어야했습니다. 그리고 그 일을 찾기 위해이 질문을 보았습니다. 그래서, 이것은 다른 사람들을 도울 것입니다.

    파이썬 코드가있는 솔루션이 없었고 최근에 pyspark에서 zip을 읽어야했습니다. 그리고 그 일을 찾기 위해이 질문을 보았습니다. 그래서, 이것은 다른 사람들을 도울 것입니다.

    import zipfile
    import io
    
    def zip_extract(x):
        in_memory_data = io.BytesIO(x[1])
        file_obj = zipfile.ZipFile(in_memory_data, "r")
        files = [i for i in file_obj.namelist()]
        return dict(zip(files, [file_obj.open(file).read() for file in files]))
    
    
    zips = sc.binaryFiles("hdfs:/Testing/*.zip")
    files_data = zips.map(zip_extract).collect()
    

    위의 코드에서 나는 파일 이름이 zip 인 사전을 키로, 각 텍스트 파일의 텍스트 데이터를 값으로 반환했습니다. 당신은 당신의 목적에 맞게 원하는대로 변경할 수 있습니다.

  3. ==============================

    3.비슷한 문제가 있었는데 다음 코드로 해결했습니다.

    비슷한 문제가 있었는데 다음 코드로 해결했습니다.

    sparkContext.binaryFiles("/pathToZipFiles/*")
    .flatMap { case (zipFilePath, zipContent) =>
    
            val zipInputStream = new ZipInputStream(zipContent.open())
    
            Stream.continually(zipInputStream.getNextEntry)
            .takeWhile(_ != null)
            .flatMap { zipEntry => ??? }
        }
    
  4. ==============================

    4.@ user3591785는 올바른 방향으로 나를 지적 했으므로 올바른 대답으로 표시했습니다.

    @ user3591785는 올바른 방향으로 나를 지적 했으므로 올바른 대답으로 표시했습니다.

    좀 더 자세한 내용을 보려면 ZipFileInputFormat Hadoop을 검색 할 수 있었고이 링크를 발견했습니다. http://cotdp.com/2012/07/hadoop-processing-zip-files-in-mapreduce/

    ZipFileInputFormat과 도우미 ZipfileRecordReader 클래스를 사용하여 Spark을 완벽하게 열고 zip 파일을 읽을 수있었습니다.

        rdd1  = sc.newAPIHadoopFile("/Users/myname/data/compressed/target_file.ZIP", ZipFileInputFormat.class, Text.class, Text.class, new Job().getConfiguration());
    

    결과는 하나의 요소가있는지도였습니다. 파일 이름을 키로, 내용을 값으로 사용하여 이것을 JavaPairRdd로 변환해야했습니다. 원하는 경우 Text를 BytesWritable로 교체하고 ArrayList를 다른 것으로 바꾸 겠지만 내 목표는 먼저 실행하는 것이 었습니다.

    JavaPairRDD<String, String> rdd2 = rdd1.flatMapToPair(new PairFlatMapFunction<Tuple2<Text, Text>, String, String>() {
    
        @Override
        public Iterable<Tuple2<String, String>> call(Tuple2<Text, Text> textTextTuple2) throws Exception {
            List<Tuple2<String,String>> newList = new ArrayList<Tuple2<String, String>>();
    
            InputStream is = new ByteArrayInputStream(textTextTuple2._2.getBytes());
            BufferedReader br = new BufferedReader(new InputStreamReader(is, "UTF-8"));
    
            String line;
    
            while ((line = br.readLine()) != null) {
    
            Tuple2 newTuple = new Tuple2(line.split("\\t")[0],line);
                newList.add(newTuple);
            }
            return newList;
        }
    });
    
  5. ==============================

    5.

    using API sparkContext.newAPIHadoopRDD(hadoopConf, InputFormat.class, ImmutableBytesWritable.class, Result.class) 
    

    파일 이름은 conf를 사용하여 전달해야합니다.

    conf=( new Job().getConfiguration())
    conf.set(PROPERTY_NAME from your input formatter,"Zip file address")
    sparkContext.newAPIHadoopRDD(conf, ZipFileInputFormat.class, Text.class, Text.class)
    

    입력 경로에서 PROPERTY_NAME을 (를) 입력 경로에서 찾으십시오.

  6. ==============================

    6.이 대답은 이전 지식 만 수집하고 내 경험을 공유합니다.

    이 대답은 이전 지식 만 수집하고 내 경험을 공유합니다.

    @ Tinku 및 @JeffLL 답변을 따라 가며 가져온 ZipFileInputFormat을 sc.newAPIHadoopFile API와 함께 사용합니다. 그러나 이것은 나를 위해 작동하지 않았다. 그리고 나는 어떻게 필자의 생산 클러스터에 com-cotdp-hadoop 라이브러리를 넣을 것인지 모른다. 나는 설치에 대한 책임을지지 않습니다.

    @Tiago Palma는 좋은 충고를했지만 그는 대답을 끝내지 않았고 압축 해제 된 결과물을 얻으려고 꽤 오랜 시간을 보냈습니다.

    내가 그렇게 할 수있게되었을 때, 나는 나의 대답에서 찾을 수있는 모든 이론적 인면을 준비해야만했다 : https://stackoverflow.com/a/45958182/1549135

    그러나 언급 된 답변의 빠진 부분은 ZipEntry를 읽는 것입니다.

    import java.util.zip.ZipInputStream;
    import java.io.BufferedReader;
    import java.io.InputStreamReader;   
    
    sc.binaryFiles(path, minPartitions)
          .flatMap { case (name: String, content: PortableDataStream) =>
            val zis = new ZipInputStream(content.open)
            Stream.continually(zis.getNextEntry)
                  .takeWhile(_ != null)
                  .flatMap { _ =>
                      val br = new BufferedReader(new InputStreamReader(zis))
                      Stream.continually(br.readLine()).takeWhile(_ != null)
                  }}
    
  7. from https://stackoverflow.com/questions/28569788/how-to-open-stream-zip-files-through-spark by cc-by-sa and MIT license