[HADOOP] Apache Spark 및 Java를 사용하여 CSV를 DataFrame / DataSet으로 구문 분석
HADOOPApache Spark 및 Java를 사용하여 CSV를 DataFrame / DataSet으로 구문 분석
나는 불꽃이 생기고, CSV (한 줄에 고용 됨)에서 다음을 찾으려면 group-by & reduce를 사용하고 싶습니다.
Department, Designation, costToCompany, State
Sales, Trainee, 12000, UP
Sales, Lead, 32000, AP
Sales, Lead, 32000, LA
Sales, Lead, 32000, TN
Sales, Lead, 32000, AP
Sales, Lead, 32000, TN
Sales, Lead, 32000, LA
Sales, Lead, 32000, LA
Marketing, Associate, 18000, TN
Marketing, Associate, 18000, TN
HR, Manager, 58000, TN
Department, Designation, State별로 CSV를 단순화하고 싶습니다. sum (costToCompany) 및 TotalEmployeeCount
다음과 같은 결과를 얻어야합니다.
Dept, Desg, state, empCount, totalCost
Sales,Lead,AP,2,64000
Sales,Lead,LA,3,96000
Sales,Lead,TN,2,64000
변환과 작업을 사용하여이를 달성 할 수있는 방법이 있습니까? 아니면 RDD 작업을해야합니까?
해결법
-
==============================
1.이 시점에서 두 가지 접근법이 있습니다.
이 시점에서 두 가지 접근법이 있습니다.
-
==============================
2.
import org.apache.spark.sql.SparkSession; SparkSession spark = SparkSession .builder() .appName("Java Spark SQL Example") .getOrCreate();
import org.apache.spark.sql.types.StructType; StructType schema = new StructType() .add("department", "string") .add("designation", "string") .add("ctc", "long") .add("state", "string");
Dataset<Row> df = spark.read() .option("mode", "DROPMALFORMED") .schema(schema) .csv("hdfs://path/input.csv");
CSV 파일에서 데이터를 읽는 옵션 더보기
"org.apache.spark" % "spark-core_2.11" % "2.0.0" "org.apache.spark" % "spark-sql_2.11" % "2.0.0"
-
==============================
3.다음은 완전히 정확하지는 않지만 데이터를 저글링하는 방법에 대한 아이디어를 제공해야합니다. 그것은 꽤 아니 예, 클래스 등으로 대체해야하지만 스파크 API를 사용하는 방법의 빠른 예를 들어, 나는 그것이 충분 희망 :)
다음은 완전히 정확하지는 않지만 데이터를 저글링하는 방법에 대한 아이디어를 제공해야합니다. 그것은 꽤 아니 예, 클래스 등으로 대체해야하지만 스파크 API를 사용하는 방법의 빠른 예를 들어, 나는 그것이 충분 희망 :)
val rawlines = sc.textfile("hdfs://.../*.csv") case class Employee(dep: String, des: String, cost: Double, state: String) val employees = rawlines .map(_.split(",") /*or use a proper CSV parser*/ .map( Employee(row(0), row(1), row(2), row(3) ) # the 1 is the amount of employees (which is obviously 1 per line) val keyVals = employees.map( em => (em.dep, em.des, em.state), (1 , em.cost)) val results = keyVals.reduceByKey{ a,b => (a._1 + b._1, b._1, b._2) # (a.count + b.count , a.cost + b.cost ) } #debug output results.take(100).foreach(println) results .map( keyval => someThingToFormatAsCsvStringOrWhatever ) .saveAsTextFile("hdfs://.../results")
또는 SparkSQL을 사용할 수 있습니다.
val sqlContext = new SQLContext(sparkContext) # case classes can easily be registered as tables employees.registerAsTable("employees") val results = sqlContext.sql("""select dep, des, state, sum(cost), count(*) from employees group by dep,des,state"""
-
==============================
4.JSON의 경우 텍스트 파일에 한 줄에 하나의 JSON 객체가 있으면 sqlContext.jsonFile (path)을 사용하여 Spark SQL에서이를 SchemaRDD로로드 할 수 있습니다 (스키마가 자동으로 유추됩니다). 그런 다음이를 테이블로 등록하고 SQL로 u 리할 수 있습니다. 텍스트 파일을 레코드 당 하나의 JSON 객체가 들어있는 RDD [String]로 수동으로로드하고 sqlContext.jsonRDD (rdd)를 사용하여 SchemaRDD로 설정할 수 있습니다. jsonRDD는 데이터를 사전 처리해야 할 때 유용합니다.
JSON의 경우 텍스트 파일에 한 줄에 하나의 JSON 객체가 있으면 sqlContext.jsonFile (path)을 사용하여 Spark SQL에서이를 SchemaRDD로로드 할 수 있습니다 (스키마가 자동으로 유추됩니다). 그런 다음이를 테이블로 등록하고 SQL로 u 리할 수 있습니다. 텍스트 파일을 레코드 당 하나의 JSON 객체가 들어있는 RDD [String]로 수동으로로드하고 sqlContext.jsonRDD (rdd)를 사용하여 SchemaRDD로 설정할 수 있습니다. jsonRDD는 데이터를 사전 처리해야 할 때 유용합니다.
from https://stackoverflow.com/questions/25362942/parse-csv-as-dataframe-dataset-with-apache-spark-and-java by cc-by-sa and MIT license
'HADOOP' 카테고리의 다른 글
[HADOOP] Hadoop HDFS에 이미지 / 비디오 저장 (0) | 2019.06.01 |
---|---|
[HADOOP] Hbase 신속하게 행 개수를 계산 (0) | 2019.06.01 |
[HADOOP] 하이브에 구조체의 배열 분해 (0) | 2019.06.01 |
[HADOOP] Google의 Dremel은 무엇인가요? Mapreduce와 다른 점은 무엇입니까? (0) | 2019.06.01 |
[HADOOP] 원격 파일을 로컬 디스크에 복사하지 않고 hadoop에 저장 (0) | 2019.06.01 |