[HADOOP] 스파크를 통해 .zip 파일을 열거 나 스트리밍하는 법?
HADOOP스파크를 통해 .zip 파일을 열거 나 스트리밍하는 법?
Spark를 통해 'Zip'파일을 열고 싶습니다. Hadoops 네이티브 코덱 지원으로 .gzip 파일을 열 수는 있지만 .zip 파일로는 그렇게 할 수 없습니다.
Spark 코드에서 zip 파일을 읽을 수있는 쉬운 방법이 있습니까? 또한 CompressionCodecFactory에 추가 할 zip 코덱 구현을 검색했지만 지금까지 실패했습니다.
해결법
-
==============================
1.아래 코드를 시도하십시오.
아래 코드를 시도하십시오.
using API sparkContext.newAPIHadoopRDD( hadoopConf, InputFormat.class, ImmutableBytesWritable.class, Result.class)
-
==============================
2.파이썬 코드가있는 솔루션이 없었고 최근에 pyspark에서 zip을 읽어야했습니다. 그리고 그 일을 찾기 위해이 질문을 보았습니다. 그래서, 이것은 다른 사람들을 도울 것입니다.
파이썬 코드가있는 솔루션이 없었고 최근에 pyspark에서 zip을 읽어야했습니다. 그리고 그 일을 찾기 위해이 질문을 보았습니다. 그래서, 이것은 다른 사람들을 도울 것입니다.
import zipfile import io def zip_extract(x): in_memory_data = io.BytesIO(x[1]) file_obj = zipfile.ZipFile(in_memory_data, "r") files = [i for i in file_obj.namelist()] return dict(zip(files, [file_obj.open(file).read() for file in files])) zips = sc.binaryFiles("hdfs:/Testing/*.zip") files_data = zips.map(zip_extract).collect()
위의 코드에서 나는 파일 이름이 zip 인 사전을 키로, 각 텍스트 파일의 텍스트 데이터를 값으로 반환했습니다. 당신은 당신의 목적에 맞게 원하는대로 변경할 수 있습니다.
-
==============================
3.비슷한 문제가 있었는데 다음 코드로 해결했습니다.
비슷한 문제가 있었는데 다음 코드로 해결했습니다.
sparkContext.binaryFiles("/pathToZipFiles/*") .flatMap { case (zipFilePath, zipContent) => val zipInputStream = new ZipInputStream(zipContent.open()) Stream.continually(zipInputStream.getNextEntry) .takeWhile(_ != null) .flatMap { zipEntry => ??? } }
-
==============================
4.@ user3591785는 올바른 방향으로 나를 지적 했으므로 올바른 대답으로 표시했습니다.
@ user3591785는 올바른 방향으로 나를 지적 했으므로 올바른 대답으로 표시했습니다.
좀 더 자세한 내용을 보려면 ZipFileInputFormat Hadoop을 검색 할 수 있었고이 링크를 발견했습니다. http://cotdp.com/2012/07/hadoop-processing-zip-files-in-mapreduce/
ZipFileInputFormat과 도우미 ZipfileRecordReader 클래스를 사용하여 Spark을 완벽하게 열고 zip 파일을 읽을 수있었습니다.
rdd1 = sc.newAPIHadoopFile("/Users/myname/data/compressed/target_file.ZIP", ZipFileInputFormat.class, Text.class, Text.class, new Job().getConfiguration());
결과는 하나의 요소가있는지도였습니다. 파일 이름을 키로, 내용을 값으로 사용하여 이것을 JavaPairRdd로 변환해야했습니다. 원하는 경우 Text를 BytesWritable로 교체하고 ArrayList를 다른 것으로 바꾸 겠지만 내 목표는 먼저 실행하는 것이 었습니다.
JavaPairRDD<String, String> rdd2 = rdd1.flatMapToPair(new PairFlatMapFunction<Tuple2<Text, Text>, String, String>() { @Override public Iterable<Tuple2<String, String>> call(Tuple2<Text, Text> textTextTuple2) throws Exception { List<Tuple2<String,String>> newList = new ArrayList<Tuple2<String, String>>(); InputStream is = new ByteArrayInputStream(textTextTuple2._2.getBytes()); BufferedReader br = new BufferedReader(new InputStreamReader(is, "UTF-8")); String line; while ((line = br.readLine()) != null) { Tuple2 newTuple = new Tuple2(line.split("\\t")[0],line); newList.add(newTuple); } return newList; } });
-
==============================
5.
using API sparkContext.newAPIHadoopRDD(hadoopConf, InputFormat.class, ImmutableBytesWritable.class, Result.class)
파일 이름은 conf를 사용하여 전달해야합니다.
conf=( new Job().getConfiguration()) conf.set(PROPERTY_NAME from your input formatter,"Zip file address") sparkContext.newAPIHadoopRDD(conf, ZipFileInputFormat.class, Text.class, Text.class)
입력 경로에서 PROPERTY_NAME을 (를) 입력 경로에서 찾으십시오.
-
==============================
6.이 대답은 이전 지식 만 수집하고 내 경험을 공유합니다.
이 대답은 이전 지식 만 수집하고 내 경험을 공유합니다.
@ Tinku 및 @JeffLL 답변을 따라 가며 가져온 ZipFileInputFormat을 sc.newAPIHadoopFile API와 함께 사용합니다. 그러나 이것은 나를 위해 작동하지 않았다. 그리고 나는 어떻게 필자의 생산 클러스터에 com-cotdp-hadoop 라이브러리를 넣을 것인지 모른다. 나는 설치에 대한 책임을지지 않습니다.
@Tiago Palma는 좋은 충고를했지만 그는 대답을 끝내지 않았고 압축 해제 된 결과물을 얻으려고 꽤 오랜 시간을 보냈습니다.
내가 그렇게 할 수있게되었을 때, 나는 나의 대답에서 찾을 수있는 모든 이론적 인면을 준비해야만했다 : https://stackoverflow.com/a/45958182/1549135
그러나 언급 된 답변의 빠진 부분은 ZipEntry를 읽는 것입니다.
import java.util.zip.ZipInputStream; import java.io.BufferedReader; import java.io.InputStreamReader; sc.binaryFiles(path, minPartitions) .flatMap { case (name: String, content: PortableDataStream) => val zis = new ZipInputStream(content.open) Stream.continually(zis.getNextEntry) .takeWhile(_ != null) .flatMap { _ => val br = new BufferedReader(new InputStreamReader(zis)) Stream.continually(br.readLine()).takeWhile(_ != null) }}
from https://stackoverflow.com/questions/28569788/how-to-open-stream-zip-files-through-spark by cc-by-sa and MIT license
'HADOOP' 카테고리의 다른 글
[HADOOP] Oozie : Oozie <java> 액션에서 Map-Reduce를 시작 하시겠습니까? (0) | 2019.05.29 |
---|---|
[HADOOP] hadoop.mapred 대 hadoop.mapreduce? (0) | 2019.05.29 |
[HADOOP] 하이브에 데이터를 조 변경 / 피벗하는 방법은 무엇입니까? (0) | 2019.05.28 |
[HADOOP] Hadoop에서 작업을 줄이는시기는 언제 시작합니까? (0) | 2019.05.28 |
[HADOOP] Spark에서지도 작업의 ID를 얻는 방법? (0) | 2019.05.28 |