[HADOOP] Avro 파일의 온보드 스키마를 사용하여 Spark에서 Avros를로드하려면 어떻게해야합니까?
HADOOPAvro 파일의 온보드 스키마를 사용하여 Spark에서 Avros를로드하려면 어떻게해야합니까?
나는 Cloudera 소포에서 Spark 0.9.0으로 CDH 4.4를 실행 중입니다.
돼지의 AvroStorage UDF를 통해 만들어진 Avro 파일이 많습니다. Avro 파일에 내장 된 일반 레코드 나 스키마를 사용하여 Spark에서 이러한 파일을로드하려고합니다. 지금까지 나는 이것을 시도했다.
import org.apache.avro.mapred.AvroKey
import org.apache.avro.mapreduce.AvroKeyInputFormat
import org.apache.hadoop.io.NullWritable
import org.apache.commons.lang.StringEscapeUtils.escapeCsv
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.conf.Configuration
import java.net.URI
import java.io.BufferedInputStream
import java.io.File
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
import org.apache.avro.specific.SpecificDatumReader
import org.apache.avro.file.DataFileStream
import org.apache.avro.io.DatumReader
import org.apache.avro.file.DataFileReader
import org.apache.avro.mapred.FsInput
val input = "hdfs://hivecluster2/securityx/web_proxy_mef/2014/05/29/22/part-m-00016.avro"
val inURI = new URI(input)
val inPath = new Path(inURI)
val fsInput = new FsInput(inPath, sc.hadoopConfiguration)
val reader = new GenericDatumReader[GenericRecord]
val dataFileReader = DataFileReader.openReader(fsInput, reader)
val schemaString = dataFileReader.getSchema
val buf = scala.collection.mutable.ListBuffer.empty[GenericRecord]
while(dataFileReader.hasNext) {
buf += dataFileReader.next
}
sc.parallelize(buf)
이것은 하나의 파일에서 작동하지만 확장 할 수는 없습니다 - 모든 데이터를 로컬 RAM에로드 한 다음 거기에서 스파크 노드를 통해 배포합니다.
해결법
-
==============================
1.내 자신의 질문에 대답하려면 :
내 자신의 질문에 대답하려면 :
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.avro.generic.GenericRecord import org.apache.avro.mapred.AvroKey import org.apache.avro.mapred.AvroInputFormat import org.apache.avro.mapreduce.AvroKeyInputFormat import org.apache.hadoop.io.NullWritable import org.apache.commons.lang.StringEscapeUtils.escapeCsv import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.fs.Path import org.apache.hadoop.conf.Configuration import java.io.BufferedInputStream import org.apache.avro.file.DataFileStream import org.apache.avro.io.DatumReader import org.apache.avro.file.DataFileReader import org.apache.avro.file.DataFileReader import org.apache.avro.generic.{GenericDatumReader, GenericRecord} import org.apache.avro.mapred.FsInput import org.apache.avro.Schema import org.apache.avro.Schema.Parser import org.apache.hadoop.mapred.JobConf import java.io.File import java.net.URI // spark-shell -usejavacp -classpath "*.jar" val input = "hdfs://hivecluster2/securityx/web_proxy_mef/2014/05/29/22/part-m-00016.avro" val jobConf= new JobConf(sc.hadoopConfiguration) val rdd = sc.hadoopFile( input, classOf[org.apache.avro.mapred.AvroInputFormat[GenericRecord]], classOf[org.apache.avro.mapred.AvroWrapper[GenericRecord]], classOf[org.apache.hadoop.io.NullWritable], 10) val f1 = rdd.first val a = f1._1.datum a.get("rawLog") // Access avro fields
-
==============================
2.이 작품은 나를 위해 :
이 작품은 나를 위해 :
import org.apache.avro.generic.GenericRecord import org.apache.avro.mapred.{AvroInputFormat, AvroWrapper} import org.apache.hadoop.io.NullWritable ... val path = "hdfs:///path/to/your/avro/folder" val avroRDD = sc.hadoopFile[AvroWrapper[GenericRecord], NullWritable, AvroInputFormat[GenericRecord]](path)
from https://stackoverflow.com/questions/23944615/how-can-i-load-avros-in-spark-using-the-schema-on-board-the-avro-files by cc-by-sa and MIT license
'HADOOP' 카테고리의 다른 글
[HADOOP] Hadoop Streaming에서 "typedbytes"또는 "rawbytes"를 사용하는 방법은 무엇입니까? (0) | 2019.07.07 |
---|---|
[HADOOP] HDFS에서 blockName의 파일을 찾는 방법 hadoop (0) | 2019.07.07 |
[HADOOP] java.util.Map의 드롭 인 대체품 찾기 (0) | 2019.07.06 |
[HADOOP] Apache Spark on YARN : 많은 수의 입력 데이터 파일 (스파크의 여러 입력 파일 결합) (0) | 2019.07.06 |
[HADOOP] Hadoop을 실행할 때 OutOfMemoryException을 피하는 방법? (0) | 2019.07.06 |