복붙노트

[SCALA] 아파치 스파크에서 여러 파일이 포함 된 우편을 읽는 방법

SCALA

아파치 스파크에서 여러 파일이 포함 된 우편을 읽는 방법

여러 개의 텍스트 파일이 포함 된 압축 파일을 데. 나는 각 파일을 읽고 각 파일의 내용을 containining RDD의 목록을 구축하고자합니다.

val test = sc.textFile("/Volumes/work/data/kaggle/dato/test/5.zip")

단지 전체 파일을 것입니다,하지만 어떻게 반복 우편의 각 내용을 통해 다음 RDD에서 같은 불꽃을 사용하여 저장합니다.

나는 스칼라 또는 파이썬 괜찮아요.

스파크를 사용하여 파이썬에서 가능한 솔루션 -

archive = zipfile.ZipFile(archive_path, 'r')
file_paths = zipfile.ZipFile.namelist(archive)
for file_path in file_paths:
    urls = file_path.split("/")
    urlId = urls[-1].split('_')[0]

해결법

  1. ==============================

    1.https://stackoverflow.com/a/45958182/1549135 : 나는 당신이 참조 할 수 있음을, 다른 대답에 필요한 모든 이론을 작성했습니다

    https://stackoverflow.com/a/45958182/1549135 : 나는 당신이 참조 할 수 있음을, 다른 대답에 필요한 모든 이론을 작성했습니다

    나는 @Herman 및 사용 ZipInputStream에 의해 주어진 조언을 따랐다. 이것은 나에게 RDD [문자열] 우편 내용의 반환이 솔루션을 주었다.

    import java.io.{BufferedReader, InputStreamReader}
    import java.util.zip.ZipInputStream
    import org.apache.spark.SparkContext
    import org.apache.spark.input.PortableDataStream
    import org.apache.spark.rdd.RDD
    
    implicit class ZipSparkContext(val sc: SparkContext) extends AnyVal {
    
        def readFile(path: String,
                     minPartitions: Int = sc.defaultMinPartitions): RDD[String] = {
    
          if (path.endsWith(".zip")) {
            sc.binaryFiles(path, minPartitions)
              .flatMap { case (name: String, content: PortableDataStream) =>
                val zis = new ZipInputStream(content.open)
                Stream.continually(zis.getNextEntry)
                      .takeWhile {
                          case null => zis.close(); false
                          case _ => true
                      }
                      .flatMap { _ =>
                          val br = new BufferedReader(new InputStreamReader(zis))
                          Stream.continually(br.readLine()).takeWhile(_ != null)
                      }
            }
          } else {
            sc.textFile(path, minPartitions)
          }
        }
      }
    

    단순히 암시 클래스를 가져 와서 사용하고 SparkContext에 ReadFile을 메서드를 호출합니다 :

    import com.github.atais.spark.Implicits.ZipSparkContext
    sc.readFile(path)
    
  2. ==============================

    2.당신이 읽는 경우 바이너리 파일은 sc.binaryFiles를 사용합니다. 이것은 파일 이름과 PortableDataStream을 포함하는 튜플의 RDD를 반환합니다. 당신은 ZipInputStream으로 후자를 공급할 수 있습니다.

    당신이 읽는 경우 바이너리 파일은 sc.binaryFiles를 사용합니다. 이것은 파일 이름과 PortableDataStream을 포함하는 튜플의 RDD를 반환합니다. 당신은 ZipInputStream으로 후자를 공급할 수 있습니다.

  3. ==============================

    3.여기서 (스트림을 폐쇄함으로써 개선이 필요) @Atais 용액의 작업 버전이다 :

    여기서 (스트림을 폐쇄함으로써 개선이 필요) @Atais 용액의 작업 버전이다 :

    implicit class ZipSparkContext(val sc: SparkContext) extends AnyVal {
    
    def readFile(path: String,
                 minPartitions: Int = sc.defaultMinPartitions): RDD[String] = {
    
      if (path.toLowerCase.contains("zip")) {
    
        sc.binaryFiles(path, minPartitions)
          .flatMap {
            case (zipFilePath, zipContent) ⇒
              val zipInputStream = new ZipInputStream(zipContent.open())
              Stream.continually(zipInputStream.getNextEntry)
                .takeWhile(_ != null)
                .map { _ ⇒
                  scala.io.Source.fromInputStream(zipInputStream, "UTF-8").getLines.mkString("\n")
                } #::: { zipInputStream.close; Stream.empty[String] }
          }
      } else {
        sc.textFile(path, minPartitions)
      }
    }
    }
    

    모든 당신은이 zip 파일을 읽을 수 따르고해야 할 :

    sc.readFile(path)
    
  4. ==============================

    4.이 첫 번째 행을 필터링합니다. 캔 누구나 공유 통찰력. 나는 압축하는 CSV 파일을 읽고 추가 처리를 위해 JavaRDD을 만들려고하고 있습니다.

    이 첫 번째 행을 필터링합니다. 캔 누구나 공유 통찰력. 나는 압축하는 CSV 파일을 읽고 추가 처리를 위해 JavaRDD을 만들려고하고 있습니다.

    JavaPairRDD<String, PortableDataStream> zipData =
                    sc.binaryFiles("hdfs://temp.zip");
            JavaRDD<Record> newRDDRecord = zipData.flatMap(
              new FlatMapFunction<Tuple2<String, PortableDataStream>, Record>(){
                  public Iterator<Record> call(Tuple2<String,PortableDataStream> content) throws Exception {
                      List<Record> records = new ArrayList<Record>();
                          ZipInputStream zin = new ZipInputStream(content._2.open());
                          ZipEntry zipEntry;
                          while ((zipEntry = zin.getNextEntry()) != null) {
                              count++;
                              if (!zipEntry.isDirectory()) {
                                  Record sd;
                                  String line;
                                  InputStreamReader streamReader = new InputStreamReader(zin);
                                  BufferedReader bufferedReader = new BufferedReader(streamReader);
                                  line = bufferedReader.readLine();
                                  String[] records= new CSVParser().parseLineMulti(line);
                                  sd = new Record(TimeBuilder.convertStringToTimestamp(records[0]),
                                            getDefaultValue(records[1]),
                                            getDefaultValue(records[22]));
                                  records.add(sd);
                              }
                          }
    
                    return records.iterator();
                  }
    
            });
    
  5. ==============================

    5.여기에 나중에 분할 및 그것에서 별도의 스키마를 만드는 데 사용할 수있는 파일 이름을 제공 작동하는 다른 솔루션입니다.

    여기에 나중에 분할 및 그것에서 별도의 스키마를 만드는 데 사용할 수있는 파일 이름을 제공 작동하는 다른 솔루션입니다.

    암시 적 클래스 ZipSparkContext (발 사우스 캐롤라이나 : SparkContext)는 AnyVal를 {확장     데프에서 ReadFile (경로 : 문자열,                  minPartitions : 지능 = sc.defaultMinPartitions) RDD [문자열] = {       경우 (path.toLowerCase.contains ( "우편")) {         sc.binaryFiles (경로 minPartitions)           .flatMap {             케이스 (zipFilePath, zipContent) ⇒               발 zipInputStream 새로운 ZipInputStream을 = (zipContent.open ())               Stream.continually (zipInputStream.getNextEntry)                 .takeWhile (_! = null이)                 .MAP {X ⇒                   브로 파일 이름 1 = x.getName                   scala.io.Source.fromInputStream (zipInputStream, "UTF-8"). getLines.mkString (들 "~ $ {파일 이름 1} \ n") + S를 "~ $ {파일 이름 1}"                 } # ::: {zipInputStream.close; Stream.empty [문자열]}           }       } 다른 {         sc.textFile (경로 minPartitions)       }     }   }

    전체 코드는 여기

    https://github.com/kali786516/Spark2StructuredStreaming/blob/master/src/main/scala/com/dataframe/extraDFExamples/SparkReadZipFiles.scala

  6. from https://stackoverflow.com/questions/32080475/how-to-read-a-zip-containing-multiple-files-in-apache-spark by cc-by-sa and MIT license