[HADOOP] Spark - CSV 파일을 DataFrame으로로드 하시겠습니까?
HADOOPSpark - CSV 파일을 DataFrame으로로드 하시겠습니까?
spark CSV를 읽고 DataFrame으로 변환하여 df.registerTempTable ( "table_name")을 사용하여 HDFS에 저장하고 싶습니다.
scala> val df = sqlContext.load("hdfs:///csv/file/dir/file.csv")
java.lang.RuntimeException: hdfs:///csv/file/dir/file.csv is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [49, 59, 54, 10]
at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418)
at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:277)
at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:276)
at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Apache Spark에서 CSV 파일을 DataFrame으로로드하는 올바른 명령은 무엇입니까?
해결법
-
==============================
1.spark-csv는 핵심 Spark 기능의 일부이며 별도의 라이브러리가 필요하지 않습니다. 예를 들어 할 수 있습니다.
spark-csv는 핵심 Spark 기능의 일부이며 별도의 라이브러리가 필요하지 않습니다. 예를 들어 할 수 있습니다.
df = spark.read.format("csv").option("header", "true").load("csvfile.csv")
스칼라에서는 (이 형식은 구분 기호로 구분 된 모든 형식에서 사용할 수 있습니다. ","CSV "는"tsv "의 경우"\ t ") val df = sqlContext.read.format ( "com.databricks.spark.csv") .option ( "구분 기호", ",") .load ( "csvfile.csv")
-
==============================
2.먼저 SparkSession 객체를 초기화합니다.이 객체는 기본적으로 spark
먼저 SparkSession 객체를 초기화합니다.이 객체는 기본적으로 spark
val spark = org.apache.spark.sql.SparkSession.builder .master("local") .appName("Spark CSV Reader") .getOrCreate;
val df = spark.read .format("csv") .option("header", "true") //first line in file has headers .option("mode", "DROPMALFORMED") .load("hdfs:///csv/file/dir/file.csv")
val df = spark.sql("SELECT * FROM csv.`hdfs:///csv/file/dir/file.csv`")
종속성 :
"org.apache.spark" % "spark-core_2.11" % 2.0.0, "org.apache.spark" % "spark-sql_2.11" % 2.0.0,
val df = sqlContext.read .format("com.databricks.spark.csv") .option("header", "true") .option("mode", "DROPMALFORMED") .load("csv/file/path");
종속성 :
"org.apache.spark" % "spark-sql_2.10" % 1.6.0, "com.databricks" % "spark-csv_2.10" % 1.6.0, "com.univocity" % "univocity-parsers" % LATEST,
-
==============================
3.하둡이 2.6이고 스파크가 1.6이고 "databricks"패키지가 없다.
하둡이 2.6이고 스파크가 1.6이고 "databricks"패키지가 없다.
import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType}; import org.apache.spark.sql.Row; val csv = sc.textFile("/path/to/file.csv") val rows = csv.map(line => line.split(",").map(_.trim)) val header = rows.first val data = rows.filter(_(0) != header(0)) val rdd = data.map(row => Row(row(0),row(1).toInt)) val schema = new StructType() .add(StructField("id", StringType, true)) .add(StructField("val", IntegerType, true)) val df = sqlContext.createDataFrame(rdd, schema)
-
==============================
4.Spark 2.0에서는 다음과 같은 방법으로 CSV를 읽을 수 있습니다.
Spark 2.0에서는 다음과 같은 방법으로 CSV를 읽을 수 있습니다.
val conf = new SparkConf().setMaster("local[2]").setAppName("my app") val sc = new SparkContext(conf) val sparkSession = SparkSession.builder .config(conf = conf) .appName("spark session example") .getOrCreate() val path = "/Users/xxx/Downloads/usermsg.csv" val base_df = sparkSession.read.option("header","true"). csv(path)
-
==============================
5.Java 1.8에서 CSV 파일을 완벽하게 읽는 코드 스 니펫
Java 1.8에서 CSV 파일을 완벽하게 읽는 코드 스 니펫
POM.HML
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.0.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.10</artifactId> <version>2.0.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library --> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.11.8</version> </dependency> <dependency> <groupId>com.databricks</groupId> <artifactId>spark-csv_2.10</artifactId> <version>1.4.0</version> </dependency>
자바
SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local"); // create Spark Context SparkContext context = new SparkContext(conf); // create spark Session SparkSession sparkSession = new SparkSession(context); Dataset<Row> df = sparkSession.read().format("com.databricks.spark.csv").option("header", true).option("inferSchema", true).load("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv"); //("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv"); System.out.println("========== Print Schema ============"); df.printSchema(); System.out.println("========== Print Data =============="); df.show(); System.out.println("========== Print title =============="); df.select("title").show();
-
==============================
6.Penny 's Spark 2 예제는 spark2에서이를 수행하는 방법입니다. 또 하나의 트릭이 있습니다 : inferSchema 옵션을 true로 설정하여 데이터의 초기 스캔을 수행하여 헤더를 생성하십시오.
Penny 's Spark 2 예제는 spark2에서이를 수행하는 방법입니다. 또 하나의 트릭이 있습니다 : inferSchema 옵션을 true로 설정하여 데이터의 초기 스캔을 수행하여 헤더를 생성하십시오.
여기에서 스파크가 설정 한 스파크 세션이라면 S3의 아마존 호스트 인 모든 랜드 셋 이미지의 CSV 인덱스 파일을로드하는 작업입니다.
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ val csvdata = spark.read.options(Map( "header" -> "true", "ignoreLeadingWhiteSpace" -> "true", "ignoreTrailingWhiteSpace" -> "true", "timestampFormat" -> "yyyy-MM-dd HH:mm:ss.SSSZZZ", "inferSchema" -> "true", "mode" -> "FAILFAST")) .csv("s3a://landsat-pds/scene_list.gz")
나쁜 소식은 파일을 통한 검색을 트리거하는 것입니다. 이 20 + MB 압축 CSV 파일과 같이 크기가 커서 장거리 연결시 30 초가 걸릴 수 있습니다. 염두에 두어야 할 점은 일단 스키마를 입력하면 수동으로 코딩하는 것이 좋습니다.
(코드 스 니펫 Apache Software License 2.0은 모든 모호함을 피하기 위해 라이센스를 받았고, S3 통합의 데모 / 통합 테스트로 수행 한 작업)
-
==============================
7.CSV 파일을 파싱하는 데는 많은 어려움이 있습니다. 열 값에 영어 / 이스케이프 / 구분 기호 / 기타 문자가 있으면 파싱 오류가 발생할 수 있으므로 파일 크기가 커지면 더할 수 없습니다.
CSV 파일을 파싱하는 데는 많은 어려움이 있습니다. 열 값에 영어 / 이스케이프 / 구분 기호 / 기타 문자가 있으면 파싱 오류가 발생할 수 있으므로 파일 크기가 커지면 더할 수 없습니다.
그러면 마법은 사용되는 옵션에 있습니다. 저와 희망을 위해 일한 것들이 대부분의 경우를 다루어야합니다.
### Create a Spark Session spark = SparkSession.builder.master("local").appName("Classify Urls").getOrCreate() ### Note the options that are used. You may have to tweak these in case of error html_df = spark.read.csv(html_csv_file_path, header=True, multiLine=True, ignoreLeadingWhiteSpace=True, ignoreTrailingWhiteSpace=True, encoding="UTF-8", sep=',', quote='"', escape='"', maxColumns=2, inferSchema=True)
희망이 도움이됩니다. 자세한 내용은 PySpark 2를 사용하여 HTML 소스 코드가있는 CSV를 읽으십시오.
참고 : 위의 코드는 Spark 2 API에서 가져온 것입니다. CSV 파일 읽기 API는 Spark installable의 기본 제공 패키지와 함께 번들로 제공됩니다.
참고 : PySpark는 Spark 용 Python 래퍼이며 Scala / Java와 동일한 API를 공유합니다.
-
==============================
8.스칼라 2.11 및 아파치 2.0 이상을 사용하여 항아리를 만드는 경우.
스칼라 2.11 및 아파치 2.0 이상을 사용하여 항아리를 만드는 경우.
sqlContext 또는 sparkContext 객체를 만들 필요가 없습니다. SparkSession 객체만으로 모든 요구 사항을 만족시킬 수 있습니다.
다음은 잘 동작하는 mycode입니다 :
import org.apache.spark.sql.{DataFrame, Row, SQLContext, SparkSession} import org.apache.log4j.{Level, LogManager, Logger} object driver { def main(args: Array[String]) { val log = LogManager.getRootLogger log.info("**********JAR EXECUTION STARTED**********") val spark = SparkSession.builder().master("local").appName("ValidationFrameWork").getOrCreate() val df = spark.read.format("csv") .option("header", "true") .option("delimiter","|") .option("inferSchema","true") .load("d:/small_projects/spark/test.pos") df.show() } }
클러스터에서 실행중인 경우 spasterBuilder 객체를 정의하는 동안 .master ( "yarn") 만 .master ( "local")로 변경하십시오.
Spark Doc에서는 다음과 같은 내용을 다룹니다. https://spark.apache.org/docs/2.2.0/sql-programming-guide.html
-
==============================
9.기본 파일 형식은 Parquet with spark.read ..이고 파일 읽기 csv는 왜 예외가 발생 하는지를 나타냅니다. 사용하려는 API에 csv 형식 지정
기본 파일 형식은 Parquet with spark.read ..이고 파일 읽기 csv는 왜 예외가 발생 하는지를 나타냅니다. 사용하려는 API에 csv 형식 지정
-
==============================
10.CSV 파일을로드하고 결과를 DataFrame으로 반환합니다.
CSV 파일을로드하고 결과를 DataFrame으로 반환합니다.
df=sparksession.read.option("header", true).csv("file_name.csv")
Dataframe에서 파일을 CSV 형식으로 처리했습니다.
from https://stackoverflow.com/questions/29704333/spark-load-csv-file-as-dataframe by cc-by-sa and MIT license
'HADOOP' 카테고리의 다른 글
[HADOOP] 축소 단계 후에 출력 파일 병합 (0) | 2019.05.26 |
---|---|
[HADOOP] Apache Spark의 기본 키 (0) | 2019.05.26 |
[HADOOP] Hadoop 프로세스 레코드는 블록 경계에서 어떻게 분리됩니까? (0) | 2019.05.26 |
[HADOOP] 키로 여러 출력에 쓰기 Spark - 하나의 Spark 작업 (0) | 2019.05.26 |
[HADOOP] Hadoop "플랫폼에 native-hadoop 라이브러리를로드 할 수 없습니다"경고 (0) | 2019.05.26 |