복붙노트

[HADOOP] `ssc.fileStream ()`을 사용하여 마루 파일을 읽는 법? `ssc.fileStream ()`에 전달되는 타입은 무엇입니까?

HADOOP

`ssc.fileStream ()`을 사용하여 마루 파일을 읽는 법? `ssc.fileStream ()`에 전달되는 타입은 무엇입니까?

Spark의 fileStream () 메서드에 대한 나의 이해는 키, 값 및 형식의 세 가지 유형을 매개 변수로 사용한다는 것입니다. 텍스트 파일의 경우 적절한 유형은 LongWritable, Text 및 TextInputFormat입니다.

먼저, 이러한 유형의 본질을 이해하고 싶습니다. 직관적으로, 필자는이 경우 키가 파일의 줄 번호이고 값이 해당 줄의 텍스트라고 추측합니다. 따라서 텍스트 파일의 다음 예제에서 :

Hello
Test
Another Test

DStream의 첫 번째 행에는 키 1 ​​(0?)과 값 Hello가 있습니다.

이 올바른지?

내 질문의 두 번째 부분 : ParquetInputFormat의 디 컴파일 된 구현을 살펴보면 흥미로운 점을 발견했습니다.

public class ParquetInputFormat<T>
       extends FileInputFormat<Void, T> {
//...

public class TextInputFormat
       extends FileInputFormat<LongWritable, Text>
       implements JobConfigurable {
//...

TextInputFormat은 LongWritable 및 Text 유형의 FileInputFormat을 확장하지만 ParquetInputFormat는 Void 및 T 유형의 동일한 클래스를 확장합니다.

이것은 내 마루 데이터의 전체 행을 보유하기 위해 Value 클래스를 작성한 다음 > 유형을 ssc.fileStream ()에 전달해야한다는 의미입니까?

그렇다면 MyClass를 어떻게 구현해야합니까?

편집 1 : ParquetInputFormat 개체에 전달 될 readSupportClass 것으로 나타났습니다. 어떤 종류의 클래스이며, 파르 케 파일을 파싱하는 데 어떻게 사용됩니까? 이 문제를 다루는 문서가 있습니까?

편집 2 : 내가 말할 수있는 한, 이것은 불가능합니다. 누구든지 스파크에 마루 파일을 스트리밍하는 방법을 알고 있다면 공유해주세요.

해결법

  1. ==============================

    1.Spark Streaming의 쪽모퉁이 파일을 읽는 샘플은 다음과 같습니다.

    Spark Streaming의 쪽모퉁이 파일을 읽는 샘플은 다음과 같습니다.

    val ssc = new StreamingContext(sparkConf, Seconds(2))
    ssc.sparkContext.hadoopConfiguration.set("parquet.read.support.class", "parquet.avro.AvroReadSupport")
    val stream = ssc.fileStream[Void, GenericRecord, ParquetInputFormat[GenericRecord]](
      directory, { path: Path => path.toString.endsWith("parquet") }, true, ssc.sparkContext.hadoopConfiguration)
    
    val lines = stream.map(row => {
      println("row:" + row.toString())
      row
    })
    

    몇 가지 요점은 ...

    샘플을 만들기 위해 아래 소스 코드를 참조했습니다. 그리고 저는 또한 좋은 모범을 찾을 수 없었습니다. 나는 더 나은 것을 기다리고 싶다.

    https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala https://github.com/Parquet/parquet-mr/blob/master/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala

  2. ==============================

    2.

    val ssc = new StreamingContext(conf, Seconds(5))
    var schema =StructType(Seq(
          StructField("a", StringType, nullable = false),
          ........
    
         ))
    val schemaJson=schema.json
    
    val fileDir="/tmp/fileDir"
    ssc.sparkContext.hadoopConfiguration.set("parquet.read.support.class", "org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport")  ssc.sparkContext.hadoopConfiguration.set("org.apache.spark.sql.parquet.row.requested_schema", schemaJson)
    ssc.sparkContext.hadoopConfiguration.set(SQLConf.PARQUET_BINARY_AS_STRING.key, "false")
    ssc.sparkContext.hadoopConfiguration.set(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, "false")
    ssc.sparkContext.hadoopConfiguration.set(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key, "false")
    ssc.sparkContext.hadoopConfiguration.set(SQLConf.PARQUET_BINARY_AS_STRING.key, "false")
    
    val streamRdd = ssc.fileStream[Void, UnsafeRow, ParquetInputFormat[UnsafeRow]](fileDir,(t: org.apache.hadoop.fs.Path) => true, false)
    
    streamRdd.count().print()
    
    ssc.start()
    ssc.awaitTermination()
    

    그건 그렇고, 나는 2.1.0 스파크를 사용하고있다.

  3. from https://stackoverflow.com/questions/35413552/how-to-read-parquet-files-using-ssc-filestream-what-are-the-types-passed-to by cc-by-sa and MIT license