복붙노트

[HADOOP] Spark SQL에서 CSV로 데이터를 내보내는 방법

HADOOP

Spark SQL에서 CSV로 데이터를 내보내는 방법

이 명령은 HiveQL에서 작동합니다.

insert overwrite directory '/data/home.csv' select * from testtable;

Spark SQL을 사용하면 org.apache.spark.sql.hive.HiveQl 스택 추적에 오류가 발생합니다.

java.lang.RuntimeException: Unsupported language features in query:
    insert overwrite directory '/data/home.csv' select * from testtable

Spark SQL에서 CSV 기능으로 내보내기를 작성하도록 안내해주십시오.

해결법

  1. ==============================

    1.아래 명령문을 사용하여 CSV 형식으로 dataframe의 내용을 쓸 수 있습니다. df.write.csv ( "/ data / home / csv")

    아래 명령문을 사용하여 CSV 형식으로 dataframe의 내용을 쓸 수 있습니다. df.write.csv ( "/ data / home / csv")

    전체 데이터 프레임을 단일 CSV 파일로 작성해야하는 경우 df.coalesce (1) .write.csv ( "/ data / home / sample.csv")

    spark 1.x의 경우 spark-csv를 사용하여 결과를 CSV 파일에 쓸 수 있습니다

    스칼라 스 니펫이 도움이 될 것입니다.

    import org.apache.spark.sql.hive.HiveContext
    // sc - existing spark context
    val sqlContext = new HiveContext(sc)
    val df = sqlContext.sql("SELECT * FROM testtable")
    df.write.format("com.databricks.spark.csv").save("/data/home/csv")
    

    내용을 단일 파일에 쓰려면

    import org.apache.spark.sql.hive.HiveContext
    // sc - existing spark context
    val sqlContext = new HiveContext(sc)
    val df = sqlContext.sql("SELECT * FROM testtable")
    df.coalesce(1).write.format("com.databricks.spark.csv").save("/data/home/sample.csv")
    
  2. ==============================

    2.Spark 2.X spark-csv는 원시 데이터 소스로 통합되어 있기 때문에. 따라서 필요한 문은 (창)

    Spark 2.X spark-csv는 원시 데이터 소스로 통합되어 있기 때문에. 따라서 필요한 문은 (창)

    df.write
      .option("header", "true")
      .csv("file:///C:/out.csv")
    

    또는 UNIX

    df.write
      .option("header", "true")
      .csv("/var/out.csv")
    
  3. ==============================

    3.위의 spark-csv의 대답은 정확하지만 문제가 있습니다. 라이브러리는 데이터 프레임 분할을 기반으로 여러 파일을 만듭니다. 그리고 이것은 우리가 일반적으로 필요로하는 것이 아닙니다. 따라서 모든 파티션을 하나로 결합 할 수 있습니다.

    위의 spark-csv의 대답은 정확하지만 문제가 있습니다. 라이브러리는 데이터 프레임 분할을 기반으로 여러 파일을 만듭니다. 그리고 이것은 우리가 일반적으로 필요로하는 것이 아닙니다. 따라서 모든 파티션을 하나로 결합 할 수 있습니다.

    df.coalesce(1).
        write.
        format("com.databricks.spark.csv").
        option("header", "true").
        save("myfile.csv")
    

    lib (name "part-00000")의 출력을 원하는 파일 이름으로 바꿉니다.

    이 블로그 게시물은 자세한 내용을 제공합니다 : https://fullstackml.com/2015/12/21/how-to-export-data-frame-from-apache-spark/

  4. ==============================

    4.가장 간단한 방법은 DataFrame의 RDD를 매핑하고 mkString을 사용하는 것입니다.

    가장 간단한 방법은 DataFrame의 RDD를 매핑하고 mkString을 사용하는 것입니다.

      df.rdd.map(x=>x.mkString(","))
    

    Spark 1.5부터 (또는 그 전에) df.map (r => r.mkString ( ",")) 같은 작업을 수행합니다. CSV 이스케이프를 원한다면 아파치를 사용할 수 있습니다. 예 : 우리가 사용하고있는 코드는 다음과 같습니다.

     def DfToTextFile(path: String,
                       df: DataFrame,
                       delimiter: String = ",",
                       csvEscape: Boolean = true,
                       partitions: Int = 1,
                       compress: Boolean = true,
                       header: Option[String] = None,
                       maxColumnLength: Option[Int] = None) = {
    
        def trimColumnLength(c: String) = {
          val col = maxColumnLength match {
            case None => c
            case Some(len: Int) => c.take(len)
          }
          if (csvEscape) StringEscapeUtils.escapeCsv(col) else col
        }
        def rowToString(r: Row) = {
          val st = r.mkString("~-~").replaceAll("[\\p{C}|\\uFFFD]", "") //remove control characters
          st.split("~-~").map(trimColumnLength).mkString(delimiter)
        }
    
        def addHeader(r: RDD[String]) = {
          val rdd = for (h <- header;
                         if partitions == 1; //headers only supported for single partitions
                         tmpRdd = sc.parallelize(Array(h))) yield tmpRdd.union(r).coalesce(1)
          rdd.getOrElse(r)
        }
    
        val rdd = df.map(rowToString).repartition(partitions)
        val headerRdd = addHeader(rdd)
    
        if (compress)
          headerRdd.saveAsTextFile(path, classOf[GzipCodec])
        else
          headerRdd.saveAsTextFile(path)
      }
    
  5. ==============================

    5.이 오류 메시지는 쿼리 언어에서 지원되는 기능이 아님을 나타냅니다. 그러나 RDD 인터페이스 (df.rdd.saveAsTextFile)를 통해 DataFrame을 평소대로 모든 형식으로 저장할 수 있습니다. 또는 https://github.com/databricks/spark-csv를 확인하십시오.

    이 오류 메시지는 쿼리 언어에서 지원되는 기능이 아님을 나타냅니다. 그러나 RDD 인터페이스 (df.rdd.saveAsTextFile)를 통해 DataFrame을 평소대로 모든 형식으로 저장할 수 있습니다. 또는 https://github.com/databricks/spark-csv를 확인하십시오.

  6. ==============================

    6.spark-csv의 도움으로 CSV 파일에 쓸 수 있습니다.

    spark-csv의 도움으로 CSV 파일에 쓸 수 있습니다.

    val dfsql = sqlContext.sql("select * from tablename")
    dfsql.write.format("com.databricks.spark.csv").option("header","true").save("output.csv")`
    
  7. ==============================

    7.여기에 코드를 입력하십시오. DATAFRAME :

    여기에 코드를 입력하십시오. DATAFRAME :

    val p=spark.read.format("csv").options(Map("header"->"true","delimiter"->"^")).load("filename.csv")
    
  8. from https://stackoverflow.com/questions/31937958/how-to-export-data-from-spark-sql-to-csv by cc-by-sa and MIT license