[SCALA] 아파치 스파크에서 여러 파일이 포함 된 우편을 읽는 방법
SCALA아파치 스파크에서 여러 파일이 포함 된 우편을 읽는 방법
여러 개의 텍스트 파일이 포함 된 압축 파일을 데. 나는 각 파일을 읽고 각 파일의 내용을 containining RDD의 목록을 구축하고자합니다.
val test = sc.textFile("/Volumes/work/data/kaggle/dato/test/5.zip")
단지 전체 파일을 것입니다,하지만 어떻게 반복 우편의 각 내용을 통해 다음 RDD에서 같은 불꽃을 사용하여 저장합니다.
나는 스칼라 또는 파이썬 괜찮아요.
스파크를 사용하여 파이썬에서 가능한 솔루션 -
archive = zipfile.ZipFile(archive_path, 'r')
file_paths = zipfile.ZipFile.namelist(archive)
for file_path in file_paths:
urls = file_path.split("/")
urlId = urls[-1].split('_')[0]
해결법
-
==============================
1.https://stackoverflow.com/a/45958182/1549135 : 나는 당신이 참조 할 수 있음을, 다른 대답에 필요한 모든 이론을 작성했습니다
https://stackoverflow.com/a/45958182/1549135 : 나는 당신이 참조 할 수 있음을, 다른 대답에 필요한 모든 이론을 작성했습니다
나는 @Herman 및 사용 ZipInputStream에 의해 주어진 조언을 따랐다. 이것은 나에게 RDD [문자열] 우편 내용의 반환이 솔루션을 주었다.
import java.io.{BufferedReader, InputStreamReader} import java.util.zip.ZipInputStream import org.apache.spark.SparkContext import org.apache.spark.input.PortableDataStream import org.apache.spark.rdd.RDD implicit class ZipSparkContext(val sc: SparkContext) extends AnyVal { def readFile(path: String, minPartitions: Int = sc.defaultMinPartitions): RDD[String] = { if (path.endsWith(".zip")) { sc.binaryFiles(path, minPartitions) .flatMap { case (name: String, content: PortableDataStream) => val zis = new ZipInputStream(content.open) Stream.continually(zis.getNextEntry) .takeWhile { case null => zis.close(); false case _ => true } .flatMap { _ => val br = new BufferedReader(new InputStreamReader(zis)) Stream.continually(br.readLine()).takeWhile(_ != null) } } } else { sc.textFile(path, minPartitions) } } }
단순히 암시 클래스를 가져 와서 사용하고 SparkContext에 ReadFile을 메서드를 호출합니다 :
import com.github.atais.spark.Implicits.ZipSparkContext sc.readFile(path)
-
==============================
2.당신이 읽는 경우 바이너리 파일은 sc.binaryFiles를 사용합니다. 이것은 파일 이름과 PortableDataStream을 포함하는 튜플의 RDD를 반환합니다. 당신은 ZipInputStream으로 후자를 공급할 수 있습니다.
당신이 읽는 경우 바이너리 파일은 sc.binaryFiles를 사용합니다. 이것은 파일 이름과 PortableDataStream을 포함하는 튜플의 RDD를 반환합니다. 당신은 ZipInputStream으로 후자를 공급할 수 있습니다.
-
==============================
3.여기서 (스트림을 폐쇄함으로써 개선이 필요) @Atais 용액의 작업 버전이다 :
여기서 (스트림을 폐쇄함으로써 개선이 필요) @Atais 용액의 작업 버전이다 :
implicit class ZipSparkContext(val sc: SparkContext) extends AnyVal { def readFile(path: String, minPartitions: Int = sc.defaultMinPartitions): RDD[String] = { if (path.toLowerCase.contains("zip")) { sc.binaryFiles(path, minPartitions) .flatMap { case (zipFilePath, zipContent) ⇒ val zipInputStream = new ZipInputStream(zipContent.open()) Stream.continually(zipInputStream.getNextEntry) .takeWhile(_ != null) .map { _ ⇒ scala.io.Source.fromInputStream(zipInputStream, "UTF-8").getLines.mkString("\n") } #::: { zipInputStream.close; Stream.empty[String] } } } else { sc.textFile(path, minPartitions) } } }
모든 당신은이 zip 파일을 읽을 수 따르고해야 할 :
sc.readFile(path)
-
==============================
4.이 첫 번째 행을 필터링합니다. 캔 누구나 공유 통찰력. 나는 압축하는 CSV 파일을 읽고 추가 처리를 위해 JavaRDD을 만들려고하고 있습니다.
이 첫 번째 행을 필터링합니다. 캔 누구나 공유 통찰력. 나는 압축하는 CSV 파일을 읽고 추가 처리를 위해 JavaRDD을 만들려고하고 있습니다.
JavaPairRDD<String, PortableDataStream> zipData = sc.binaryFiles("hdfs://temp.zip"); JavaRDD<Record> newRDDRecord = zipData.flatMap( new FlatMapFunction<Tuple2<String, PortableDataStream>, Record>(){ public Iterator<Record> call(Tuple2<String,PortableDataStream> content) throws Exception { List<Record> records = new ArrayList<Record>(); ZipInputStream zin = new ZipInputStream(content._2.open()); ZipEntry zipEntry; while ((zipEntry = zin.getNextEntry()) != null) { count++; if (!zipEntry.isDirectory()) { Record sd; String line; InputStreamReader streamReader = new InputStreamReader(zin); BufferedReader bufferedReader = new BufferedReader(streamReader); line = bufferedReader.readLine(); String[] records= new CSVParser().parseLineMulti(line); sd = new Record(TimeBuilder.convertStringToTimestamp(records[0]), getDefaultValue(records[1]), getDefaultValue(records[22])); records.add(sd); } } return records.iterator(); } });
-
==============================
5.여기에 나중에 분할 및 그것에서 별도의 스키마를 만드는 데 사용할 수있는 파일 이름을 제공 작동하는 다른 솔루션입니다.
여기에 나중에 분할 및 그것에서 별도의 스키마를 만드는 데 사용할 수있는 파일 이름을 제공 작동하는 다른 솔루션입니다.
암시 적 클래스 ZipSparkContext (발 사우스 캐롤라이나 : SparkContext)는 AnyVal를 {확장 데프에서 ReadFile (경로 : 문자열, minPartitions : 지능 = sc.defaultMinPartitions) RDD [문자열] = { 경우 (path.toLowerCase.contains ( "우편")) { sc.binaryFiles (경로 minPartitions) .flatMap { 케이스 (zipFilePath, zipContent) ⇒ 발 zipInputStream 새로운 ZipInputStream을 = (zipContent.open ()) Stream.continually (zipInputStream.getNextEntry) .takeWhile (_! = null이) .MAP {X ⇒ 브로 파일 이름 1 = x.getName scala.io.Source.fromInputStream (zipInputStream, "UTF-8"). getLines.mkString (들 "~ $ {파일 이름 1} \ n") + S를 "~ $ {파일 이름 1}" } # ::: {zipInputStream.close; Stream.empty [문자열]} } } 다른 { sc.textFile (경로 minPartitions) } } }
전체 코드는 여기
https://github.com/kali786516/Spark2StructuredStreaming/blob/master/src/main/scala/com/dataframe/extraDFExamples/SparkReadZipFiles.scala
from https://stackoverflow.com/questions/32080475/how-to-read-a-zip-containing-multiple-files-in-apache-spark by cc-by-sa and MIT license
'SCALA' 카테고리의 다른 글
[SCALA] SBT의 DependsOn RootProject는 : 종속성을 컴파일되지 않습니다 (0) | 2019.11.24 |
---|---|
[SCALA] 스칼라는 일반적인 특성과 개체를 확장 (0) | 2019.11.24 |
[SCALA] 어떻게 아파치 스파크에서 RowMatrix의 역을 계산하기? (0) | 2019.11.24 |
[SCALA] SPARK에서 왼쪽 및 오른쪽 키워드를 사용하는 방법 SQL (0) | 2019.11.24 |
[SCALA] 타입에서 TypeTag을 얻기? (0) | 2019.11.24 |