[SCALA] 포스트 그레스 표에 Dataframes UPSERT 불꽃
SCALA포스트 그레스 표에 Dataframes UPSERT 불꽃
나는 두 개의 데이터 소스를 결합하고 다른 DataFrame 같은 결과를 얻을 수 아파치 스파크 DataFrames을 사용하고 있습니다. 나는 다른 포스트 그레스 테이블에 결과를 쓰고 싶어요. 나는이 옵션을 보려면 :
myDataFrame.write.jdbc(url, table, connectionProperties)
하지만, 내가 뭘 원하는 UPSERT은 테이블의 기본 키에 따라 테이블에 dataframe입니다. 이것은 어떻게 할 수 있나요? 나는 스파크 1.6.0를 사용하고 있습니다.
해결법
-
==============================
1.이 지원되지 않습니다. DataFrameWriter 중 하나를 추가 할 또는 덮어 쓰기 테이블을 기존의 수 있습니다. 응용 프로그램이 더 복잡한 로직을 필요로하는 경우이 수동으로 처리해야합니다.
이 지원되지 않습니다. DataFrameWriter 중 하나를 추가 할 또는 덮어 쓰기 테이블을 기존의 수 있습니다. 응용 프로그램이 더 복잡한 로직을 필요로하는 경우이 수동으로 처리해야합니다.
하나의 옵션은 표준 JDBC 연결 작업 (foreach는, foreachPartition)를 사용하는 것입니다. 또 다른 하나는 임시로 작성하고 데이터베이스에 직접 나머지를 처리하는 것입니다.
-
==============================
2.KrisP 그것의 권리가있다. upsert을 수행하는 가장 좋은 방법은 준비된 성명을 통해이 아니다. 그것은이 방법으로 당신이 가지고있는 근로자의 수 많은 파티션으로 한 번에 하나의 삽입 점에 유의하는 것이 중요합니다. 당신은 배치 할 수에서뿐만 아니라을이 작업을 수행하려면
KrisP 그것의 권리가있다. upsert을 수행하는 가장 좋은 방법은 준비된 성명을 통해이 아니다. 그것은이 방법으로 당신이 가지고있는 근로자의 수 많은 파티션으로 한 번에 하나의 삽입 점에 유의하는 것이 중요합니다. 당신은 배치 할 수에서뿐만 아니라을이 작업을 수행하려면
import java.sql._ dataframe.coalesce("NUMBER OF WORKERS").mapPartitions((d) => Iterator(d)).foreach { batch => val dbc: Connection = DriverManager.getConnection("JDBCURL") val st: PreparedStatement = dbc.prepareStatement("YOUR PREPARED STATEMENT") batch.grouped("# Of Rows you want per batch").foreach { session => session.foreach { x => st.setDouble(1, x.getDouble(1)) st.addBatch() } st.executeBatch() } dbc.close() }
이것은 각 작업자 배치를 실행하고 DB 연결을 닫습니다. 그것은 당신이 얼마나 많은 노동자, 얼마나 많은 배치를 제어 할 수 있습니다 그 범위 내에서 작업 할 수 있습니다.
-
==============================
3.수동 및 zero323 언급 옵션 1을 통해 그것을하기 위하여려고하는 경우에, 당신은 여기에 삽입 문에 대한 스파크 소스 코드를 살펴한다
수동 및 zero323 언급 옵션 1을 통해 그것을하기 위하여려고하는 경우에, 당신은 여기에 삽입 문에 대한 스파크 소스 코드를 살펴한다
def insertStatement(conn: Connection, table: String, rddSchema: StructType): PreparedStatement = { val columns = rddSchema.fields.map(_.name).mkString(",") val placeholders = rddSchema.fields.map(_ => "?").mkString(",") val sql = s"INSERT INTO $table ($columns) VALUES ($placeholders)" conn.prepareStatement(sql) }
PreparedStatement의은 java.sql의 일부이며 방법 () 및 executeUpdate의 ()을 실행하고자 갖는다. 당신은 여전히 물론, 그에 따라 SQL을 수정해야합니다.
-
==============================
4.JDBC를 삽입하려면 사용할 수 있습니다
JDBC를 삽입하려면 사용할 수 있습니다
dataframe.write.mode (SaveMode.Append) .jdbc (jdbc_url, TABLE_NAME, connection_properties)
또한, Dataframe.write 당신에게 DataFrameWriter를 제공하고 그것은 dataframe를 삽입하는 몇 가지 방법이있다.
데프 insertInto (TABLENAME : 문자열) : 단위
삽입 지정된 테이블에 DataFrame의 내용. 그것은 DataFrame의 스키마는 테이블의 스키마과 동일해야합니다.
그것은 기존의 테이블, 형식 또는 옵션에 데이터를 삽입하기 때문에 무시됩니다.
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameWriter
아무것도 아직 스파크 생각에서 밖으로 상자의 개별 레코드를 업데이트 없습니다
from https://stackoverflow.com/questions/34643200/spark-dataframes-upsert-to-postgres-table by cc-by-sa and MIT license
'SCALA' 카테고리의 다른 글
[SCALA] 스칼라리스트의 말미에 부가 된 요소 (0) | 2019.11.14 |
---|---|
[SCALA] 스칼라 함수를 정의의이 세 가지 방법의 차이 (0) | 2019.11.14 |
[SCALA] 스칼라 파일에서 jar 파일 만들기 (0) | 2019.11.14 |
[SCALA] 자원 스칼라 폴더에서 어떻게 파일을 읽을? (0) | 2019.11.14 |
[SCALA] Scalaz 상태 모나드 예 (0) | 2019.11.14 |