복붙노트

[HADOOP] Spark - CSV 파일을 DataFrame으로로드 하시겠습니까?

HADOOP

Spark - CSV 파일을 DataFrame으로로드 하시겠습니까?

spark CSV를 읽고 DataFrame으로 변환하여 df.registerTempTable ( "table_name")을 사용하여 HDFS에 저장하고 싶습니다.

scala> val df = sqlContext.load("hdfs:///csv/file/dir/file.csv")
java.lang.RuntimeException: hdfs:///csv/file/dir/file.csv is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [49, 59, 54, 10]
    at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418)
    at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:277)
    at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:276)
    at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
    at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
    at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)
    at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
    at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
    at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Apache Spark에서 CSV 파일을 DataFrame으로로드하는 올바른 명령은 무엇입니까?

해결법

  1. ==============================

    1.spark-csv는 핵심 Spark 기능의 일부이며 별도의 라이브러리가 필요하지 않습니다. 예를 들어 할 수 있습니다.

    spark-csv는 핵심 Spark 기능의 일부이며 별도의 라이브러리가 필요하지 않습니다. 예를 들어 할 수 있습니다.

    df = spark.read.format("csv").option("header", "true").load("csvfile.csv")
    

    스칼라에서는 (이 형식은 구분 기호로 구분 된 모든 형식에서 사용할 수 있습니다. ","CSV "는"tsv "의 경우"\ t ") val df = sqlContext.read.format ( "com.databricks.spark.csv")     .option ( "구분 기호", ",")     .load ( "csvfile.csv")

  2. ==============================

    2.먼저 SparkSession 객체를 초기화합니다.이 객체는 기본적으로 spark

    먼저 SparkSession 객체를 초기화합니다.이 객체는 기본적으로 spark

    val spark = org.apache.spark.sql.SparkSession.builder
            .master("local")
            .appName("Spark CSV Reader")
            .getOrCreate;
    
     val df = spark.read
             .format("csv")
             .option("header", "true") //first line in file has headers
             .option("mode", "DROPMALFORMED")
             .load("hdfs:///csv/file/dir/file.csv")
    
     val df = spark.sql("SELECT * FROM csv.`hdfs:///csv/file/dir/file.csv`")
    

    종속성 :

     "org.apache.spark" % "spark-core_2.11" % 2.0.0,
     "org.apache.spark" % "spark-sql_2.11" % 2.0.0,
    
    val df = sqlContext.read
        .format("com.databricks.spark.csv")
        .option("header", "true") 
        .option("mode", "DROPMALFORMED")
        .load("csv/file/path"); 
    

    종속성 :

    "org.apache.spark" % "spark-sql_2.10" % 1.6.0,
    "com.databricks" % "spark-csv_2.10" % 1.6.0,
    "com.univocity" % "univocity-parsers" % LATEST,
    
  3. ==============================

    3.하둡이 2.6이고 스파크가 1.6이고 "databricks"패키지가 없다.

    하둡이 2.6이고 스파크가 1.6이고 "databricks"패키지가 없다.

    import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType};
    import org.apache.spark.sql.Row;
    
    val csv = sc.textFile("/path/to/file.csv")
    val rows = csv.map(line => line.split(",").map(_.trim))
    val header = rows.first
    val data = rows.filter(_(0) != header(0))
    val rdd = data.map(row => Row(row(0),row(1).toInt))
    
    val schema = new StructType()
        .add(StructField("id", StringType, true))
        .add(StructField("val", IntegerType, true))
    
    val df = sqlContext.createDataFrame(rdd, schema)
    
  4. ==============================

    4.Spark 2.0에서는 다음과 같은 방법으로 CSV를 읽을 수 있습니다.

    Spark 2.0에서는 다음과 같은 방법으로 CSV를 읽을 수 있습니다.

    val conf = new SparkConf().setMaster("local[2]").setAppName("my app")
    val sc = new SparkContext(conf)
    val sparkSession = SparkSession.builder
      .config(conf = conf)
      .appName("spark session example")
      .getOrCreate()
    
    val path = "/Users/xxx/Downloads/usermsg.csv"
    val base_df = sparkSession.read.option("header","true").
      csv(path)
    
  5. ==============================

    5.Java 1.8에서 CSV 파일을 완벽하게 읽는 코드 스 니펫

    Java 1.8에서 CSV 파일을 완벽하게 읽는 코드 스 니펫

    POM.HML

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.0.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.10</artifactId>
        <version>2.0.0</version>
    </dependency>
    
    <!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>2.11.8</version>
    </dependency>
    <dependency>
        <groupId>com.databricks</groupId>
        <artifactId>spark-csv_2.10</artifactId>
        <version>1.4.0</version>
    </dependency>
    

    자바

    SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local");
    // create Spark Context
    SparkContext context = new SparkContext(conf);
    // create spark Session
    SparkSession sparkSession = new SparkSession(context);
    
    Dataset<Row> df = sparkSession.read().format("com.databricks.spark.csv").option("header", true).option("inferSchema", true).load("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv");
    
            //("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv");
    System.out.println("========== Print Schema ============");
    df.printSchema();
    System.out.println("========== Print Data ==============");
    df.show();
    System.out.println("========== Print title ==============");
    df.select("title").show();
    
  6. ==============================

    6.Penny 's Spark 2 예제는 spark2에서이를 수행하는 방법입니다. 또 하나의 트릭이 있습니다 : inferSchema 옵션을 true로 설정하여 데이터의 초기 스캔을 수행하여 헤더를 생성하십시오.

    Penny 's Spark 2 예제는 spark2에서이를 수행하는 방법입니다. 또 하나의 트릭이 있습니다 : inferSchema 옵션을 true로 설정하여 데이터의 초기 스캔을 수행하여 헤더를 생성하십시오.

    여기에서 스파크가 설정 한 스파크 세션이라면 S3의 아마존 호스트 인 모든 랜드 셋 이미지의 CSV 인덱스 파일을로드하는 작업입니다.

      /*
       * Licensed to the Apache Software Foundation (ASF) under one or more
       * contributor license agreements.  See the NOTICE file distributed with
       * this work for additional information regarding copyright ownership.
       * The ASF licenses this file to You under the Apache License, Version 2.0
       * (the "License"); you may not use this file except in compliance with
       * the License.  You may obtain a copy of the License at
       *
       *    http://www.apache.org/licenses/LICENSE-2.0
       *
       * Unless required by applicable law or agreed to in writing, software
       * distributed under the License is distributed on an "AS IS" BASIS,
       * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
       * See the License for the specific language governing permissions and
       * limitations under the License.
       */
    
    val csvdata = spark.read.options(Map(
        "header" -> "true",
        "ignoreLeadingWhiteSpace" -> "true",
        "ignoreTrailingWhiteSpace" -> "true",
        "timestampFormat" -> "yyyy-MM-dd HH:mm:ss.SSSZZZ",
        "inferSchema" -> "true",
        "mode" -> "FAILFAST"))
      .csv("s3a://landsat-pds/scene_list.gz")
    

    나쁜 소식은 파일을 통한 검색을 트리거하는 것입니다. 이 20 + MB 압축 CSV 파일과 같이 크기가 커서 장거리 연결시 30 초가 걸릴 수 있습니다. 염두에 두어야 할 점은 일단 스키마를 입력하면 수동으로 코딩하는 것이 좋습니다.

    (코드 스 니펫 Apache Software License 2.0은 모든 모호함을 피하기 위해 라이센스를 받았고, S3 통합의 데모 / 통합 테스트로 수행 한 작업)

  7. ==============================

    7.CSV 파일을 파싱하는 데는 많은 어려움이 있습니다. 열 값에 영어 / 이스케이프 / 구분 기호 / 기타 문자가 있으면 파싱 오류가 발생할 수 있으므로 파일 크기가 커지면 더할 수 없습니다.

    CSV 파일을 파싱하는 데는 많은 어려움이 있습니다. 열 값에 영어 / 이스케이프 / 구분 기호 / 기타 문자가 있으면 파싱 오류가 발생할 수 있으므로 파일 크기가 커지면 더할 수 없습니다.

    그러면 마법은 사용되는 옵션에 있습니다. 저와 희망을 위해 일한 것들이 대부분의 경우를 다루어야합니다.

    ### Create a Spark Session
    spark = SparkSession.builder.master("local").appName("Classify Urls").getOrCreate()
    
    ### Note the options that are used. You may have to tweak these in case of error
    html_df = spark.read.csv(html_csv_file_path, 
                             header=True, 
                             multiLine=True, 
                             ignoreLeadingWhiteSpace=True, 
                             ignoreTrailingWhiteSpace=True, 
                             encoding="UTF-8",
                             sep=',',
                             quote='"', 
                             escape='"',
                             maxColumns=2,
                             inferSchema=True)
    

    희망이 도움이됩니다. 자세한 내용은 PySpark 2를 사용하여 HTML 소스 코드가있는 CSV를 읽으십시오.

    참고 : 위의 코드는 Spark 2 API에서 가져온 것입니다. CSV 파일 읽기 API는 Spark installable의 기본 제공 패키지와 함께 번들로 제공됩니다.

    참고 : PySpark는 Spark 용 Python 래퍼이며 Scala / Java와 동일한 API를 공유합니다.

  8. ==============================

    8.스칼라 2.11 및 아파치 2.0 이상을 사용하여 항아리를 만드는 경우.

    스칼라 2.11 및 아파치 2.0 이상을 사용하여 항아리를 만드는 경우.

    sqlContext 또는 sparkContext 객체를 만들 필요가 없습니다. SparkSession 객체만으로 모든 요구 사항을 만족시킬 수 있습니다.

    다음은 잘 동작하는 mycode입니다 :

    import org.apache.spark.sql.{DataFrame, Row, SQLContext, SparkSession}
    import org.apache.log4j.{Level, LogManager, Logger}
    
    object driver {
    
      def main(args: Array[String]) {
    
        val log = LogManager.getRootLogger
    
        log.info("**********JAR EXECUTION STARTED**********")
    
        val spark = SparkSession.builder().master("local").appName("ValidationFrameWork").getOrCreate()
        val df = spark.read.format("csv")
          .option("header", "true")
          .option("delimiter","|")
          .option("inferSchema","true")
          .load("d:/small_projects/spark/test.pos")
        df.show()
      }
    }
    

    클러스터에서 실행중인 경우 spasterBuilder 객체를 정의하는 동안 .master ( "yarn") 만 .master ( "local")로 변경하십시오.

    Spark Doc에서는 다음과 같은 내용을 다룹니다. https://spark.apache.org/docs/2.2.0/sql-programming-guide.html

  9. ==============================

    9.기본 파일 형식은 Parquet with spark.read ..이고 파일 읽기 csv는 왜 예외가 발생 하는지를 나타냅니다. 사용하려는 API에 csv 형식 지정

    기본 파일 형식은 Parquet with spark.read ..이고 파일 읽기 csv는 왜 예외가 발생 하는지를 나타냅니다. 사용하려는 API에 csv 형식 지정

  10. ==============================

    10.CSV 파일을로드하고 결과를 DataFrame으로 반환합니다.

    CSV 파일을로드하고 결과를 DataFrame으로 반환합니다.

    df=sparksession.read.option("header", true).csv("file_name.csv")
    

    Dataframe에서 파일을 CSV 형식으로 처리했습니다.

  11. from https://stackoverflow.com/questions/29704333/spark-load-csv-file-as-dataframe by cc-by-sa and MIT license