복붙노트

[HADOOP] Avro 파일의 온보드 스키마를 사용하여 Spark에서 Avros를로드하려면 어떻게해야합니까?

HADOOP

Avro 파일의 온보드 스키마를 사용하여 Spark에서 Avros를로드하려면 어떻게해야합니까?

나는 Cloudera 소포에서 Spark 0.9.0으로 CDH 4.4를 실행 중입니다.

돼지의 AvroStorage UDF를 통해 만들어진 Avro 파일이 많습니다. Avro 파일에 내장 된 일반 레코드 나 스키마를 사용하여 Spark에서 이러한 파일을로드하려고합니다. 지금까지 나는 이것을 시도했다.

import org.apache.avro.mapred.AvroKey
import org.apache.avro.mapreduce.AvroKeyInputFormat
import org.apache.hadoop.io.NullWritable
import org.apache.commons.lang.StringEscapeUtils.escapeCsv

import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.conf.Configuration
import java.net.URI
import java.io.BufferedInputStream
import java.io.File
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
import org.apache.avro.specific.SpecificDatumReader
import org.apache.avro.file.DataFileStream
import org.apache.avro.io.DatumReader
import org.apache.avro.file.DataFileReader
import org.apache.avro.mapred.FsInput

val input = "hdfs://hivecluster2/securityx/web_proxy_mef/2014/05/29/22/part-m-00016.avro"
val inURI = new URI(input)
val inPath = new Path(inURI)

val fsInput = new FsInput(inPath, sc.hadoopConfiguration)
val reader =  new GenericDatumReader[GenericRecord]
val dataFileReader = DataFileReader.openReader(fsInput, reader)
val schemaString = dataFileReader.getSchema

val buf = scala.collection.mutable.ListBuffer.empty[GenericRecord]
while(dataFileReader.hasNext)  {
  buf += dataFileReader.next
}
sc.parallelize(buf)

이것은 하나의 파일에서 작동하지만 확장 할 수는 없습니다 - 모든 데이터를 로컬 RAM에로드 한 다음 거기에서 스파크 노드를 통해 배포합니다.

해결법

  1. ==============================

    1.내 자신의 질문에 대답하려면 :

    내 자신의 질문에 대답하려면 :

    import org.apache.spark.SparkContext
    import org.apache.spark.SparkContext._
    
    import org.apache.avro.generic.GenericRecord
    import org.apache.avro.mapred.AvroKey
    import org.apache.avro.mapred.AvroInputFormat
    import org.apache.avro.mapreduce.AvroKeyInputFormat
    import org.apache.hadoop.io.NullWritable
    import org.apache.commons.lang.StringEscapeUtils.escapeCsv
    
    import org.apache.hadoop.fs.FileSystem
    import org.apache.hadoop.fs.Path
    import org.apache.hadoop.conf.Configuration
    import java.io.BufferedInputStream
    import org.apache.avro.file.DataFileStream
    import org.apache.avro.io.DatumReader
    import org.apache.avro.file.DataFileReader
    import org.apache.avro.file.DataFileReader
    import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
    import org.apache.avro.mapred.FsInput
    import org.apache.avro.Schema
    import org.apache.avro.Schema.Parser
    import org.apache.hadoop.mapred.JobConf
    import java.io.File
    import java.net.URI
    
    // spark-shell -usejavacp -classpath "*.jar"
    
    val input = "hdfs://hivecluster2/securityx/web_proxy_mef/2014/05/29/22/part-m-00016.avro"
    
    val jobConf= new JobConf(sc.hadoopConfiguration)
    val rdd = sc.hadoopFile(
      input,
      classOf[org.apache.avro.mapred.AvroInputFormat[GenericRecord]],
      classOf[org.apache.avro.mapred.AvroWrapper[GenericRecord]],
      classOf[org.apache.hadoop.io.NullWritable],
      10)
    val f1 = rdd.first
    val a = f1._1.datum
    a.get("rawLog") // Access avro fields
    
  2. ==============================

    2.이 작품은 나를 위해 :

    이 작품은 나를 위해 :

    import org.apache.avro.generic.GenericRecord
    import org.apache.avro.mapred.{AvroInputFormat, AvroWrapper}
    import org.apache.hadoop.io.NullWritable
    
    ...
    val path = "hdfs:///path/to/your/avro/folder"
    val avroRDD = sc.hadoopFile[AvroWrapper[GenericRecord], NullWritable, AvroInputFormat[GenericRecord]](path)
    
  3. from https://stackoverflow.com/questions/23944615/how-can-i-load-avros-in-spark-using-the-schema-on-board-the-avro-files by cc-by-sa and MIT license