복붙노트

[SCALA] 스파크-CSV를 사용하여 하나의 CSV 파일을 쓰기

SCALA

스파크-CSV를 사용하여 하나의 CSV 파일을 쓰기

나는 하나의 CSV를 쓰기 위해 노력하고 있어요,하지만 수 없습니다,이 폴더를 만들고하는 https://github.com/databricks/spark-csv을 사용하고 있습니다.

경로와 파일 이름과 같은 매개 변수를 사용하고 CSV 파일을 작성하는 스칼라 기능이 필요합니다.

해결법

  1. ==============================

    1.각 파티션을 개별적으로 저장되어 있기 때문에, 여러 개의 파일과 폴더를 만드는 것입니다. 당신은 당신이 다시 분할 수 (폴더에 계속) 단일 출력 파일이 필요하면 (업스트림 데이터가 크지 만 셔플을 필요로하는 경우 선호) :

    각 파티션을 개별적으로 저장되어 있기 때문에, 여러 개의 파일과 폴더를 만드는 것입니다. 당신은 당신이 다시 분할 수 (폴더에 계속) 단일 출력 파일이 필요하면 (업스트림 데이터가 크지 만 셔플을 필요로하는 경우 선호) :

    df
       .repartition(1)
       .write.format("com.databricks.spark.csv")
       .option("header", "true")
       .save("mydata.csv")
    

    또는 병합 :

    df
       .coalesce(1)
       .write.format("com.databricks.spark.csv")
       .option("header", "true")
       .save("mydata.csv")
    

    저장하기 전에 데이터 프레임 :

    모든 데이터는 / 부품-00000 mydata.csv에 기록됩니다. 이 옵션을 사용하기 전에 당신이 가서 한 근로자에 ​​모든 데이터를 전송하는 비용이 무엇 무엇인지 이해해야합니다. 한 직원에게 먼저 가져와 이후에 스토리지 노드에 분산 - 당신이 복제 분산 파일 시스템을 사용하는 경우, 데이터는 여러 번 옮겨진 것입니다.

    또는 당신은 그대로 코드를 떠나 단순히 이후 모든 부분을 병합 고양이 또는 HDFS의 getmerge 같은 범용 도구를 사용할 수 있습니다.

  2. ==============================

    2.당신은 HDFS와 스파크를 실행하는 경우, 나는 일반적으로 CSV 파일을 작성하고 병합을 할 HDFS를 활용하여 문제를 해결했습니다. 내가 직접 스파크 (1.6)에서 그 일을 해요 :

    당신은 HDFS와 스파크를 실행하는 경우, 나는 일반적으로 CSV 파일을 작성하고 병합을 할 HDFS를 활용하여 문제를 해결했습니다. 내가 직접 스파크 (1.6)에서 그 일을 해요 :

    import org.apache.hadoop.conf.Configuration
    import org.apache.hadoop.fs._
    
    def merge(srcPath: String, dstPath: String): Unit =  {
       val hadoopConfig = new Configuration()
       val hdfs = FileSystem.get(hadoopConfig)
       FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), true, hadoopConfig, null) 
       // the "true" setting deletes the source files once they are merged into the new output
    }
    
    
    val newData = << create your dataframe >>
    
    
    val outputfile = "/user/feeds/project/outputs/subject"  
    var filename = "myinsights"
    var outputFileName = outputfile + "/temp_" + filename 
    var mergedFileName = outputfile + "/merged_" + filename
    var mergeFindGlob  = outputFileName
    
        newData.write
            .format("com.databricks.spark.csv")
            .option("header", "false")
            .mode("overwrite")
            .save(outputFileName)
        merge(mergeFindGlob, mergedFileName )
        newData.unpersist()
    

    나는이 트릭을 배운 위치를 기억 할 수 없습니다,하지만 당신을 위해 일한다 수도 있습니다.

  3. ==============================

    3.나는 늦게 여기에 게임에 조금 수 있지만 수 (1) 또는 재분할 (1) 작은 데이터 세트의 작동 할 수 있지만 큰 데이터 세트를 모두 하나 개의 노드에 하나 개의 파티션에 던져 질 것이다 유착 사용. 이 천천히 과정을 기껏 OOM 오류 또는를 던질 가능성이 높습니다.

    나는 늦게 여기에 게임에 조금 수 있지만 수 (1) 또는 재분할 (1) 작은 데이터 세트의 작동 할 수 있지만 큰 데이터 세트를 모두 하나 개의 노드에 하나 개의 파티션에 던져 질 것이다 유착 사용. 이 천천히 과정을 기껏 OOM 오류 또는를 던질 가능성이 높습니다.

    난 당신이 하둡 API에서 FileUtil.copyMerge () 함수를 사용하는 것이 매우를 제안했다. 이것은 하나의 파일로 출력을 병합합니다.

    편집 -이 효과적으로 드라이버보다는 집행 노드에 데이터를 제공합니다. 하나의 집행자가 드라이버보다 사용하기 위해 더 많은 RAM이있는 경우 합체는 () 잘 될 것입니다.

    EDIT 2 copyMerge ()는 3.0 하둡 제거되고있다. 최신 버전에서 작동하는 방법에 대한 자세한 내용은 다음 스택 오버플로 문서를 참조하십시오 하둡은 하둡 3.0 CopyMerge을 수행하는 방법

  4. ==============================

    4.당신이 Databricks을 사용하고 하나의 작업자에 RAM에 모든 데이터를 넣을 수있는 경우 (.coalesce를 사용하여과 (1)), 당신은 찾아 결과 CSV 파일을 이동하는 DBFS을 사용할 수 있습니다 :

    당신이 Databricks을 사용하고 하나의 작업자에 RAM에 모든 데이터를 넣을 수있는 경우 (.coalesce를 사용하여과 (1)), 당신은 찾아 결과 CSV 파일을 이동하는 DBFS을 사용할 수 있습니다 :

    val fileprefix= "/mnt/aws/path/file-prefix"
    
    dataset
      .coalesce(1)       
      .write             
    //.mode("overwrite") // I usually don't use this, but you may want to.
      .option("header", "true")
      .option("delimiter","\t")
      .csv(fileprefix+".tmp")
    
    val partition_path = dbutils.fs.ls(fileprefix+".tmp/")
         .filter(file=>file.name.endsWith(".csv"))(0).path
    
    dbutils.fs.cp(partition_path,fileprefix+".tab")
    
    dbutils.fs.rm(fileprefix+".tmp",recurse=true)
    

    파일이 작업자에 RAM에 맞지 않을 경우, 당신은 FileUtils.copyMerge를 사용하는 chaotic3quilibrium의 제안을 고려할 수 있습니다 (). 나는이 일을하지 않은, 그리고 S3에, 예를 들면, 수인지 아닌지 아직 모른다.

    이 답변은이 질문에 대한 이전 답변뿐만 아니라 제공되는 코드의 내 자신의 테스트에 내장되어 있습니다. 나는 원래 Databricks에 게시 여기 다시 게시하고있다.

    내가 찾은 DBFS의 RM의 재귀 옵션의 가장 좋은 문서는 Databricks 포럼에 있습니다.

  5. ==============================

    5.다시 분할 / 저장하기 전에 (당신은 여전히 ​​폴더를 얻을 것이라고하지만 그것의 한 부분 파일이 것) 1 개 파티션 병합

    다시 분할 / 저장하기 전에 (당신은 여전히 ​​폴더를 얻을 것이라고하지만 그것의 한 부분 파일이 것) 1 개 파티션 병합

  6. ==============================

    6.당신이 rdd.coalesce 사용할 수 있습니다 (1, TRUE) .saveAsTextFile (경로)

    당신이 rdd.coalesce 사용할 수 있습니다 (1, TRUE) .saveAsTextFile (경로)

    이 경로 / 부품-00000에서 하나의 파일로 데이터를 저장할

  7. ==============================

    7.S3에 대한 작동하는 솔루션은 Minkymorgan에서 수정했습니다.

    S3에 대한 작동하는 솔루션은 Minkymorgan에서 수정했습니다.

    단순히 당신이 원래 디렉토리를 제거하려면 destPath도 deleteSource 지정으로 TXT srcPath 및 단일 최종 CSV / 등 (최종 경로는 다른 이름으로) 임시 분할 된 디렉토리 경로를 전달합니다.

    /**
    * Merges multiple partitions of spark text file output into single file. 
    * @param srcPath source directory of partitioned files
    * @param dstPath output path of individual path
    * @param deleteSource whether or not to delete source directory after merging
    * @param spark sparkSession
    */
    def mergeTextFiles(srcPath: String, dstPath: String, deleteSource: Boolean): Unit =  {
      import org.apache.hadoop.fs.FileUtil
      import java.net.URI
      val config = spark.sparkContext.hadoopConfiguration
      val fs: FileSystem = FileSystem.get(new URI(srcPath), config)
      FileUtil.copyMerge(
        fs, new Path(srcPath), fs, new Path(dstPath), deleteSource, config, null
      )
    }
    
  8. ==============================

    8.불꽃의 df.write () API는 ... 주어진 경로 안에 여러 부품 파일을 만듭니다 스파크 (1) .write.csv (...) 단 하나의 파트 파일 사용 df.coalesce 쓰기 강제로 대신 df.repartition (1 병합 등) .write.csv (...)은 좁은 반면 재분할 변환 다양한 변형 스파크 볼이다 - 재분할 ()을 병합 VS ()

    불꽃의 df.write () API는 ... 주어진 경로 안에 여러 부품 파일을 만듭니다 스파크 (1) .write.csv (...) 단 하나의 파트 파일 사용 df.coalesce 쓰기 강제로 대신 df.repartition (1 병합 등) .write.csv (...)은 좁은 반면 재분할 변환 다양한 변형 스파크 볼이다 - 재분할 ()을 병합 VS ()

    df.coalesce(1).write.csv(filepath,header=True) 
    

    한 부분-0001 -...- c000.csv 파일로 주어진 파일 경로에 폴더를 생성합니다 사용하다

    cat filepath/part-0001-...-c000.csv > filename_you_want.csv 
    

    사용자 친화적 인 파일 이름을 가지고

  9. ==============================

    9.자바를 사용하는 또 하나 개의 방법이있다

    자바를 사용하는 또 하나 개의 방법이있다

    import java.io._
    
    def printToFile(f: java.io.File)(op: java.io.PrintWriter => Unit) 
      {
         val p = new java.io.PrintWriter(f);  
         try { op(p) } 
         finally { p.close() }
      } 
    
    printToFile(new File("C:/TEMP/df.csv")) { p => df.collect().foreach(p.println)}
    
  10. from https://stackoverflow.com/questions/31674530/write-single-csv-file-using-spark-csv by cc-by-sa and MIT license