복붙노트

[SCALA] 포스트 그레스 표에 Dataframes UPSERT 불꽃

SCALA

포스트 그레스 표에 Dataframes UPSERT 불꽃

나는 두 개의 데이터 소스를 결합하고 다른 DataFrame 같은 결과를 얻을 수 아파치 스파크 DataFrames을 사용하고 있습니다. 나는 다른 포스트 그레스 테이블에 결과를 쓰고 싶어요. 나는이 옵션을 보려면 :

myDataFrame.write.jdbc(url, table, connectionProperties)

하지만, 내가 뭘 원하는 UPSERT은 테이블의 기본 키에 따라 테이블에 dataframe입니다. 이것은 어떻게 할 수 있나요? 나는 스파크 1.6.0를 사용하고 있습니다.

해결법

  1. ==============================

    1.이 지원되지 않습니다. DataFrameWriter 중 하나를 추가 할 또는 덮어 쓰기 테이블을 기존의 수 있습니다. 응용 프로그램이 더 복잡한 로직을 필요로하는 경우이 수동으로 처리해야합니다.

    이 지원되지 않습니다. DataFrameWriter 중 하나를 추가 할 또는 덮어 쓰기 테이블을 기존의 수 있습니다. 응용 프로그램이 더 복잡한 로직을 필요로하는 경우이 수동으로 처리해야합니다.

    하나의 옵션은 표준 JDBC 연결 작업 (foreach는, foreachPartition)를 사용하는 것입니다. 또 다른 하나는 임시로 작성하고 데이터베이스에 직접 나머지를 처리하는 것입니다.

  2. ==============================

    2.KrisP 그것의 권리가있다. upsert을 수행하는 가장 좋은 방법은 준비된 성명을 통해이 아니다. 그것은이 방법으로 당신이 가지고있는 근로자의 수 많은 파티션으로 한 번에 하나의 삽입 점에 유의하는 것이 중요합니다. 당신은 배치 할 수에서뿐만 아니라을이 작업을 수행하려면

    KrisP 그것의 권리가있다. upsert을 수행하는 가장 좋은 방법은 준비된 성명을 통해이 아니다. 그것은이 방법으로 당신이 가지고있는 근로자의 수 많은 파티션으로 한 번에 하나의 삽입 점에 유의하는 것이 중요합니다. 당신은 배치 할 수에서뿐만 아니라을이 작업을 수행하려면

    import java.sql._
    dataframe.coalesce("NUMBER OF WORKERS").mapPartitions((d) => Iterator(d)).foreach { batch =>
      val dbc: Connection = DriverManager.getConnection("JDBCURL")
      val st: PreparedStatement = dbc.prepareStatement("YOUR PREPARED STATEMENT")
    
      batch.grouped("# Of Rows you want per batch").foreach { session =>
        session.foreach { x =>
          st.setDouble(1, x.getDouble(1)) 
          st.addBatch()
        }
        st.executeBatch()
      }
      dbc.close()
    }
    

    이것은 각 작업자 배치를 실행하고 DB 연결을 닫습니다. 그것은 당신이 얼마나 많은 노동자, 얼마나 많은 배치를 제어 할 수 있습니다 그 범위 내에서 작업 할 수 있습니다.

  3. ==============================

    3.수동 및 zero323 언급 옵션 1을 통해 그것을하기 위하여려고하는 경우에, 당신은 여기에 삽입 문에 대한 스파크 소스 코드를 살펴한다

    수동 및 zero323 언급 옵션 1을 통해 그것을하기 위하여려고하는 경우에, 당신은 여기에 삽입 문에 대한 스파크 소스 코드를 살펴한다

      def insertStatement(conn: Connection, table: String, rddSchema: StructType): PreparedStatement = {
        val columns = rddSchema.fields.map(_.name).mkString(",")
        val placeholders = rddSchema.fields.map(_ => "?").mkString(",")
        val sql = s"INSERT INTO $table ($columns) VALUES ($placeholders)"
        conn.prepareStatement(sql)
      }
    

    PreparedStatement의은 java.sql의 일부이며 방법 () 및 executeUpdate의 ()을 실행하고자 갖는다. 당신은 여전히 ​​물론, 그에 따라 SQL을 수정해야합니다.

  4. ==============================

    4.JDBC를 삽입하려면 사용할 수 있습니다

    JDBC를 삽입하려면 사용할 수 있습니다

    dataframe.write.mode (SaveMode.Append) .jdbc (jdbc_url, TABLE_NAME, connection_properties)

    또한, Dataframe.write 당신에게 DataFrameWriter를 제공하고 그것은 dataframe를 삽입하는 몇 가지 방법이있다.

    데프 insertInto (TABLENAME : 문자열) : 단위

    삽입 지정된 테이블에 DataFrame의 내용. 그것은 DataFrame의 스키마는 테이블의 스키마과 동일해야합니다.

    그것은 기존의 테이블, 형식 또는 옵션에 데이터를 삽입하기 때문에 무시됩니다.

    http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameWriter

    아무것도 아직 스파크 생각에서 밖으로 상자의 개별 레코드를 업데이트 없습니다

  5. from https://stackoverflow.com/questions/34643200/spark-dataframes-upsert-to-postgres-table by cc-by-sa and MIT license