복붙노트

[SCALA] 어떻게 디스크에 CSV로 스파크 DataFrame을 저장?

SCALA

어떻게 디스크에 CSV로 스파크 DataFrame을 저장?

예를 들어,이 결과 :

df.filter("project = 'en'").select("title","count").groupBy("title").sum()

배열을 반환합니다.

어떻게 디스크에 CSV 파일로 스파크 DataFrame을 저장?

해결법

  1. ==============================

    1.아파치 스파크는 디스크에 기본 CSV 출력을 지원하지 않습니다.

    아파치 스파크는 디스크에 기본 CSV 출력을 지원하지 않습니다.

    당신은 네 가지 가능한 솔루션 생각을 가지고 :

    참고 : 솔루션 1, 2, 3은 스파크 호출 저장 호출하는 기본 하둡 API에 의해 생성 된 CSV 형식의 파일 (파트 - *)가 발생합니다. 당신은 파티션 당 하나 개의 파트 - 파일이있을 것이다.

  2. ==============================

    2.내가 정의 된 이름의 CSV 파일로 dataframe의 내용을 저장했다 어디 유사한 문제가 있었다. df.write ( "CSV는"). ( "<내-경로>")는 파일이 아닌 디렉토리를 생성하고 저장합니다. 그래서 다음과 같은 해결책을 제공합니다. 코드의 대부분은 논리에 약간의 수정을 다음 dataframe - 투 - CSV에서 가져옵니다.

    내가 정의 된 이름의 CSV 파일로 dataframe의 내용을 저장했다 어디 유사한 문제가 있었다. df.write ( "CSV는"). ( "<내-경로>")는 파일이 아닌 디렉토리를 생성하고 저장합니다. 그래서 다음과 같은 해결책을 제공합니다. 코드의 대부분은 논리에 약간의 수정을 다음 dataframe - 투 - CSV에서 가져옵니다.

    def saveDfToCsv(df: DataFrame, tsvOutput: String, sep: String = ",", header: Boolean = false): Unit = {
        val tmpParquetDir = "Posts.tmp.parquet"
    
        df.repartition(1).write.
            format("com.databricks.spark.csv").
            option("header", header.toString).
            option("delimiter", sep).
            save(tmpParquetDir)
    
        val dir = new File(tmpParquetDir)
        val newFileRgex = tmpParquetDir + File.separatorChar + ".part-00000.*.csv"
        val tmpTsfFile = dir.listFiles.filter(_.toPath.toString.matches(newFileRgex))(0).toString
        (new File(tmpTsvFile)).renameTo(new File(tsvOutput))
    
        dir.listFiles.foreach( f => f.delete )
        dir.delete
        }
    
  3. ==============================

    3.CSV로 디스크에 dataframe를 작성하는 것은 CSV에서 유사한 읽기입니다. 당신은 하나 개의 파일로 결과를 원한다면, 당신은 유착을 사용할 수 있습니다.

    CSV로 디스크에 dataframe를 작성하는 것은 CSV에서 유사한 읽기입니다. 당신은 하나 개의 파일로 결과를 원한다면, 당신은 유착을 사용할 수 있습니다.

    df.coalesce(1)
          .write
          .option("header","true")
          .option("sep",",")
          .mode("overwrite")
          .csv("output/path")
    

    당신의 결과는 당신이 언어 특정 솔루션을 사용한다 배열 인 경우, dataframe API를 촉발하지. 모든 때문에 결과의 이러한 종류의 드라이버 기계를 반환합니다.

  4. ==============================

    4.나는 비슷한 문제가 있었다. 나는 클라이언트 모드에서 클러스터에 연결 동안 나는 드라이버에 CSV 파일을 작성했습니다.

    나는 비슷한 문제가 있었다. 나는 클라이언트 모드에서 클러스터에 연결 동안 나는 드라이버에 CSV 파일을 작성했습니다.

    나는 잠재적 인 오류를 방지하기 위해 아파치 스파크와 같은 CSV 구문 분석 코드를 재사용하고 싶었다.

    나는 불꽃-CSV 코드를 확인하고 com.databricks.spark.csv.CsvSchemaRDD에서 [문자열] 원시의 CSV RDD에 dataframe 변환에 대한 책임 코드를 발견했다.

    슬프게도 그것은 sc.textFile 및 관련 메소드의 끝과 하드 코딩되어있다.

    나는 코드와 sc.textFile 제거 마지막 라인과 직접 대신 RDD을 반환 된-붙여 복사합니다.

    내 코드 :

    /*
      This is copypasta from com.databricks.spark.csv.CsvSchemaRDD
      Spark's code has perfect method converting Dataframe -> raw csv RDD[String]
      But in last lines of that method it's hardcoded against writing as text file -
      for our case we need RDD.
     */
    object DataframeToRawCsvRDD {
    
      val defaultCsvFormat = com.databricks.spark.csv.defaultCsvFormat
    
      def apply(dataFrame: DataFrame, parameters: Map[String, String] = Map())
               (implicit ctx: ExecutionContext): RDD[String] = {
        val delimiter = parameters.getOrElse("delimiter", ",")
        val delimiterChar = if (delimiter.length == 1) {
          delimiter.charAt(0)
        } else {
          throw new Exception("Delimiter cannot be more than one character.")
        }
    
        val escape = parameters.getOrElse("escape", null)
        val escapeChar: Character = if (escape == null) {
          null
        } else if (escape.length == 1) {
          escape.charAt(0)
        } else {
          throw new Exception("Escape character cannot be more than one character.")
        }
    
        val quote = parameters.getOrElse("quote", "\"")
        val quoteChar: Character = if (quote == null) {
          null
        } else if (quote.length == 1) {
          quote.charAt(0)
        } else {
          throw new Exception("Quotation cannot be more than one character.")
        }
    
        val quoteModeString = parameters.getOrElse("quoteMode", "MINIMAL")
        val quoteMode: QuoteMode = if (quoteModeString == null) {
          null
        } else {
          QuoteMode.valueOf(quoteModeString.toUpperCase)
        }
    
        val nullValue = parameters.getOrElse("nullValue", "null")
    
        val csvFormat = defaultCsvFormat
          .withDelimiter(delimiterChar)
          .withQuote(quoteChar)
          .withEscape(escapeChar)
          .withQuoteMode(quoteMode)
          .withSkipHeaderRecord(false)
          .withNullString(nullValue)
    
        val generateHeader = parameters.getOrElse("header", "false").toBoolean
        val headerRdd = if (generateHeader) {
          ctx.sparkContext.parallelize(Seq(
            csvFormat.format(dataFrame.columns.map(_.asInstanceOf[AnyRef]): _*)
          ))
        } else {
          ctx.sparkContext.emptyRDD[String]
        }
    
        val rowsRdd = dataFrame.rdd.map(row => {
          csvFormat.format(row.toSeq.map(_.asInstanceOf[AnyRef]): _*)
        })
    
        headerRdd union rowsRdd
      }
    
    }
    
  5. from https://stackoverflow.com/questions/33174443/how-to-save-a-spark-dataframe-as-csv-on-disk by cc-by-sa and MIT license