[SCALA] 스파크-CSV를 사용하여 하나의 CSV 파일을 쓰기
SCALA스파크-CSV를 사용하여 하나의 CSV 파일을 쓰기
나는 하나의 CSV를 쓰기 위해 노력하고 있어요,하지만 수 없습니다,이 폴더를 만들고하는 https://github.com/databricks/spark-csv을 사용하고 있습니다.
경로와 파일 이름과 같은 매개 변수를 사용하고 CSV 파일을 작성하는 스칼라 기능이 필요합니다.
해결법
-
==============================
1.각 파티션을 개별적으로 저장되어 있기 때문에, 여러 개의 파일과 폴더를 만드는 것입니다. 당신은 당신이 다시 분할 수 (폴더에 계속) 단일 출력 파일이 필요하면 (업스트림 데이터가 크지 만 셔플을 필요로하는 경우 선호) :
각 파티션을 개별적으로 저장되어 있기 때문에, 여러 개의 파일과 폴더를 만드는 것입니다. 당신은 당신이 다시 분할 수 (폴더에 계속) 단일 출력 파일이 필요하면 (업스트림 데이터가 크지 만 셔플을 필요로하는 경우 선호) :
df .repartition(1) .write.format("com.databricks.spark.csv") .option("header", "true") .save("mydata.csv")
또는 병합 :
df .coalesce(1) .write.format("com.databricks.spark.csv") .option("header", "true") .save("mydata.csv")
저장하기 전에 데이터 프레임 :
모든 데이터는 / 부품-00000 mydata.csv에 기록됩니다. 이 옵션을 사용하기 전에 당신이 가서 한 근로자에 모든 데이터를 전송하는 비용이 무엇 무엇인지 이해해야합니다. 한 직원에게 먼저 가져와 이후에 스토리지 노드에 분산 - 당신이 복제 분산 파일 시스템을 사용하는 경우, 데이터는 여러 번 옮겨진 것입니다.
또는 당신은 그대로 코드를 떠나 단순히 이후 모든 부분을 병합 고양이 또는 HDFS의 getmerge 같은 범용 도구를 사용할 수 있습니다.
-
==============================
2.당신은 HDFS와 스파크를 실행하는 경우, 나는 일반적으로 CSV 파일을 작성하고 병합을 할 HDFS를 활용하여 문제를 해결했습니다. 내가 직접 스파크 (1.6)에서 그 일을 해요 :
당신은 HDFS와 스파크를 실행하는 경우, 나는 일반적으로 CSV 파일을 작성하고 병합을 할 HDFS를 활용하여 문제를 해결했습니다. 내가 직접 스파크 (1.6)에서 그 일을 해요 :
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ def merge(srcPath: String, dstPath: String): Unit = { val hadoopConfig = new Configuration() val hdfs = FileSystem.get(hadoopConfig) FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), true, hadoopConfig, null) // the "true" setting deletes the source files once they are merged into the new output } val newData = << create your dataframe >> val outputfile = "/user/feeds/project/outputs/subject" var filename = "myinsights" var outputFileName = outputfile + "/temp_" + filename var mergedFileName = outputfile + "/merged_" + filename var mergeFindGlob = outputFileName newData.write .format("com.databricks.spark.csv") .option("header", "false") .mode("overwrite") .save(outputFileName) merge(mergeFindGlob, mergedFileName ) newData.unpersist()
나는이 트릭을 배운 위치를 기억 할 수 없습니다,하지만 당신을 위해 일한다 수도 있습니다.
-
==============================
3.나는 늦게 여기에 게임에 조금 수 있지만 수 (1) 또는 재분할 (1) 작은 데이터 세트의 작동 할 수 있지만 큰 데이터 세트를 모두 하나 개의 노드에 하나 개의 파티션에 던져 질 것이다 유착 사용. 이 천천히 과정을 기껏 OOM 오류 또는를 던질 가능성이 높습니다.
나는 늦게 여기에 게임에 조금 수 있지만 수 (1) 또는 재분할 (1) 작은 데이터 세트의 작동 할 수 있지만 큰 데이터 세트를 모두 하나 개의 노드에 하나 개의 파티션에 던져 질 것이다 유착 사용. 이 천천히 과정을 기껏 OOM 오류 또는를 던질 가능성이 높습니다.
난 당신이 하둡 API에서 FileUtil.copyMerge () 함수를 사용하는 것이 매우를 제안했다. 이것은 하나의 파일로 출력을 병합합니다.
편집 -이 효과적으로 드라이버보다는 집행 노드에 데이터를 제공합니다. 하나의 집행자가 드라이버보다 사용하기 위해 더 많은 RAM이있는 경우 합체는 () 잘 될 것입니다.
EDIT 2 copyMerge ()는 3.0 하둡 제거되고있다. 최신 버전에서 작동하는 방법에 대한 자세한 내용은 다음 스택 오버플로 문서를 참조하십시오 하둡은 하둡 3.0 CopyMerge을 수행하는 방법
-
==============================
4.당신이 Databricks을 사용하고 하나의 작업자에 RAM에 모든 데이터를 넣을 수있는 경우 (.coalesce를 사용하여과 (1)), 당신은 찾아 결과 CSV 파일을 이동하는 DBFS을 사용할 수 있습니다 :
당신이 Databricks을 사용하고 하나의 작업자에 RAM에 모든 데이터를 넣을 수있는 경우 (.coalesce를 사용하여과 (1)), 당신은 찾아 결과 CSV 파일을 이동하는 DBFS을 사용할 수 있습니다 :
val fileprefix= "/mnt/aws/path/file-prefix" dataset .coalesce(1) .write //.mode("overwrite") // I usually don't use this, but you may want to. .option("header", "true") .option("delimiter","\t") .csv(fileprefix+".tmp") val partition_path = dbutils.fs.ls(fileprefix+".tmp/") .filter(file=>file.name.endsWith(".csv"))(0).path dbutils.fs.cp(partition_path,fileprefix+".tab") dbutils.fs.rm(fileprefix+".tmp",recurse=true)
파일이 작업자에 RAM에 맞지 않을 경우, 당신은 FileUtils.copyMerge를 사용하는 chaotic3quilibrium의 제안을 고려할 수 있습니다 (). 나는이 일을하지 않은, 그리고 S3에, 예를 들면, 수인지 아닌지 아직 모른다.
이 답변은이 질문에 대한 이전 답변뿐만 아니라 제공되는 코드의 내 자신의 테스트에 내장되어 있습니다. 나는 원래 Databricks에 게시 여기 다시 게시하고있다.
내가 찾은 DBFS의 RM의 재귀 옵션의 가장 좋은 문서는 Databricks 포럼에 있습니다.
-
==============================
5.다시 분할 / 저장하기 전에 (당신은 여전히 폴더를 얻을 것이라고하지만 그것의 한 부분 파일이 것) 1 개 파티션 병합
다시 분할 / 저장하기 전에 (당신은 여전히 폴더를 얻을 것이라고하지만 그것의 한 부분 파일이 것) 1 개 파티션 병합
-
==============================
6.당신이 rdd.coalesce 사용할 수 있습니다 (1, TRUE) .saveAsTextFile (경로)
당신이 rdd.coalesce 사용할 수 있습니다 (1, TRUE) .saveAsTextFile (경로)
이 경로 / 부품-00000에서 하나의 파일로 데이터를 저장할
-
==============================
7.S3에 대한 작동하는 솔루션은 Minkymorgan에서 수정했습니다.
S3에 대한 작동하는 솔루션은 Minkymorgan에서 수정했습니다.
단순히 당신이 원래 디렉토리를 제거하려면 destPath도 deleteSource 지정으로 TXT srcPath 및 단일 최종 CSV / 등 (최종 경로는 다른 이름으로) 임시 분할 된 디렉토리 경로를 전달합니다.
/** * Merges multiple partitions of spark text file output into single file. * @param srcPath source directory of partitioned files * @param dstPath output path of individual path * @param deleteSource whether or not to delete source directory after merging * @param spark sparkSession */ def mergeTextFiles(srcPath: String, dstPath: String, deleteSource: Boolean): Unit = { import org.apache.hadoop.fs.FileUtil import java.net.URI val config = spark.sparkContext.hadoopConfiguration val fs: FileSystem = FileSystem.get(new URI(srcPath), config) FileUtil.copyMerge( fs, new Path(srcPath), fs, new Path(dstPath), deleteSource, config, null ) }
-
==============================
8.불꽃의 df.write () API는 ... 주어진 경로 안에 여러 부품 파일을 만듭니다 스파크 (1) .write.csv (...) 단 하나의 파트 파일 사용 df.coalesce 쓰기 강제로 대신 df.repartition (1 병합 등) .write.csv (...)은 좁은 반면 재분할 변환 다양한 변형 스파크 볼이다 - 재분할 ()을 병합 VS ()
불꽃의 df.write () API는 ... 주어진 경로 안에 여러 부품 파일을 만듭니다 스파크 (1) .write.csv (...) 단 하나의 파트 파일 사용 df.coalesce 쓰기 강제로 대신 df.repartition (1 병합 등) .write.csv (...)은 좁은 반면 재분할 변환 다양한 변형 스파크 볼이다 - 재분할 ()을 병합 VS ()
df.coalesce(1).write.csv(filepath,header=True)
한 부분-0001 -...- c000.csv 파일로 주어진 파일 경로에 폴더를 생성합니다 사용하다
cat filepath/part-0001-...-c000.csv > filename_you_want.csv
사용자 친화적 인 파일 이름을 가지고
-
==============================
9.자바를 사용하는 또 하나 개의 방법이있다
자바를 사용하는 또 하나 개의 방법이있다
import java.io._ def printToFile(f: java.io.File)(op: java.io.PrintWriter => Unit) { val p = new java.io.PrintWriter(f); try { op(p) } finally { p.close() } } printToFile(new File("C:/TEMP/df.csv")) { p => df.collect().foreach(p.println)}
from https://stackoverflow.com/questions/31674530/write-single-csv-file-using-spark-csv by cc-by-sa and MIT license
'SCALA' 카테고리의 다른 글
[SCALA] 유형의 차이를 적용 (0) | 2019.10.29 |
---|---|
[SCALA] 어떻게 스파크에 dataframe에 RDD 개체를 변환하는 (0) | 2019.10.29 |
[SCALA] 무엇 =의 차이> () =>, 그리고 단위 => (0) | 2019.10.29 |
[SCALA] 케이스 스칼라에 열거 대 개체 (0) | 2019.10.29 |
[SCALA] 스칼라에서 "바인딩 컨텍스트"는 무엇입니까? (0) | 2019.10.29 |