[HADOOP] Spark SQL에서 CSV로 데이터를 내보내는 방법
HADOOPSpark SQL에서 CSV로 데이터를 내보내는 방법
이 명령은 HiveQL에서 작동합니다.
insert overwrite directory '/data/home.csv' select * from testtable;
Spark SQL을 사용하면 org.apache.spark.sql.hive.HiveQl 스택 추적에 오류가 발생합니다.
java.lang.RuntimeException: Unsupported language features in query:
insert overwrite directory '/data/home.csv' select * from testtable
Spark SQL에서 CSV 기능으로 내보내기를 작성하도록 안내해주십시오.
해결법
-
==============================
1.아래 명령문을 사용하여 CSV 형식으로 dataframe의 내용을 쓸 수 있습니다. df.write.csv ( "/ data / home / csv")
아래 명령문을 사용하여 CSV 형식으로 dataframe의 내용을 쓸 수 있습니다. df.write.csv ( "/ data / home / csv")
전체 데이터 프레임을 단일 CSV 파일로 작성해야하는 경우 df.coalesce (1) .write.csv ( "/ data / home / sample.csv")
spark 1.x의 경우 spark-csv를 사용하여 결과를 CSV 파일에 쓸 수 있습니다
스칼라 스 니펫이 도움이 될 것입니다.
import org.apache.spark.sql.hive.HiveContext // sc - existing spark context val sqlContext = new HiveContext(sc) val df = sqlContext.sql("SELECT * FROM testtable") df.write.format("com.databricks.spark.csv").save("/data/home/csv")
내용을 단일 파일에 쓰려면
import org.apache.spark.sql.hive.HiveContext // sc - existing spark context val sqlContext = new HiveContext(sc) val df = sqlContext.sql("SELECT * FROM testtable") df.coalesce(1).write.format("com.databricks.spark.csv").save("/data/home/sample.csv")
-
==============================
2.Spark 2.X spark-csv는 원시 데이터 소스로 통합되어 있기 때문에. 따라서 필요한 문은 (창)
Spark 2.X spark-csv는 원시 데이터 소스로 통합되어 있기 때문에. 따라서 필요한 문은 (창)
df.write .option("header", "true") .csv("file:///C:/out.csv")
또는 UNIX
df.write .option("header", "true") .csv("/var/out.csv")
-
==============================
3.위의 spark-csv의 대답은 정확하지만 문제가 있습니다. 라이브러리는 데이터 프레임 분할을 기반으로 여러 파일을 만듭니다. 그리고 이것은 우리가 일반적으로 필요로하는 것이 아닙니다. 따라서 모든 파티션을 하나로 결합 할 수 있습니다.
위의 spark-csv의 대답은 정확하지만 문제가 있습니다. 라이브러리는 데이터 프레임 분할을 기반으로 여러 파일을 만듭니다. 그리고 이것은 우리가 일반적으로 필요로하는 것이 아닙니다. 따라서 모든 파티션을 하나로 결합 할 수 있습니다.
df.coalesce(1). write. format("com.databricks.spark.csv"). option("header", "true"). save("myfile.csv")
lib (name "part-00000")의 출력을 원하는 파일 이름으로 바꿉니다.
이 블로그 게시물은 자세한 내용을 제공합니다 : https://fullstackml.com/2015/12/21/how-to-export-data-frame-from-apache-spark/
-
==============================
4.가장 간단한 방법은 DataFrame의 RDD를 매핑하고 mkString을 사용하는 것입니다.
가장 간단한 방법은 DataFrame의 RDD를 매핑하고 mkString을 사용하는 것입니다.
df.rdd.map(x=>x.mkString(","))
Spark 1.5부터 (또는 그 전에) df.map (r => r.mkString ( ",")) 같은 작업을 수행합니다. CSV 이스케이프를 원한다면 아파치를 사용할 수 있습니다. 예 : 우리가 사용하고있는 코드는 다음과 같습니다.
def DfToTextFile(path: String, df: DataFrame, delimiter: String = ",", csvEscape: Boolean = true, partitions: Int = 1, compress: Boolean = true, header: Option[String] = None, maxColumnLength: Option[Int] = None) = { def trimColumnLength(c: String) = { val col = maxColumnLength match { case None => c case Some(len: Int) => c.take(len) } if (csvEscape) StringEscapeUtils.escapeCsv(col) else col } def rowToString(r: Row) = { val st = r.mkString("~-~").replaceAll("[\\p{C}|\\uFFFD]", "") //remove control characters st.split("~-~").map(trimColumnLength).mkString(delimiter) } def addHeader(r: RDD[String]) = { val rdd = for (h <- header; if partitions == 1; //headers only supported for single partitions tmpRdd = sc.parallelize(Array(h))) yield tmpRdd.union(r).coalesce(1) rdd.getOrElse(r) } val rdd = df.map(rowToString).repartition(partitions) val headerRdd = addHeader(rdd) if (compress) headerRdd.saveAsTextFile(path, classOf[GzipCodec]) else headerRdd.saveAsTextFile(path) }
-
==============================
5.이 오류 메시지는 쿼리 언어에서 지원되는 기능이 아님을 나타냅니다. 그러나 RDD 인터페이스 (df.rdd.saveAsTextFile)를 통해 DataFrame을 평소대로 모든 형식으로 저장할 수 있습니다. 또는 https://github.com/databricks/spark-csv를 확인하십시오.
이 오류 메시지는 쿼리 언어에서 지원되는 기능이 아님을 나타냅니다. 그러나 RDD 인터페이스 (df.rdd.saveAsTextFile)를 통해 DataFrame을 평소대로 모든 형식으로 저장할 수 있습니다. 또는 https://github.com/databricks/spark-csv를 확인하십시오.
-
==============================
6.spark-csv의 도움으로 CSV 파일에 쓸 수 있습니다.
spark-csv의 도움으로 CSV 파일에 쓸 수 있습니다.
val dfsql = sqlContext.sql("select * from tablename") dfsql.write.format("com.databricks.spark.csv").option("header","true").save("output.csv")`
-
==============================
7.여기에 코드를 입력하십시오. DATAFRAME :
여기에 코드를 입력하십시오. DATAFRAME :
val p=spark.read.format("csv").options(Map("header"->"true","delimiter"->"^")).load("filename.csv")
from https://stackoverflow.com/questions/31937958/how-to-export-data-from-spark-sql-to-csv by cc-by-sa and MIT license
'HADOOP' 카테고리의 다른 글
[HADOOP] 'hadoop fs -head'쉘 명령이없는 이유는 무엇입니까? (0) | 2019.06.06 |
---|---|
[HADOOP] 스파크에서 RDD 란 무엇입니까? (0) | 2019.06.06 |
[HADOOP] Hadoop에서 노드를 올바르게 제거하려면 어떻게해야합니까? (0) | 2019.06.05 |
[HADOOP] SQL과 같은 하이브 삽입 쿼리 (0) | 2019.06.05 |
[HADOOP] 하이브에서 COLLECT_SET ()을 사용하여 중복을 유지 하시겠습니까? (0) | 2019.06.05 |