[SCALA] 어떻게 스칼라를 사용하여 아파치 스파크에 PostgreSQL 데이터베이스에 연결할 수 있습니까?
SCALA어떻게 스칼라를 사용하여 아파치 스파크에 PostgreSQL 데이터베이스에 연결할 수 있습니까?
나는 스칼라에 다음과 같은 일을 할 수있는 방법을 알고 싶어?
나는 어떻게 포장 동안 SBT에 psql의 스칼라의 커넥터 항아리를 가져 오는 스칼라하지만를 사용하여 해야할지?
해결법
-
==============================
1.우리의 목표는 스파크 노동자 병렬 SQL 쿼리를 실행하는 것입니다.
우리의 목표는 스파크 노동자 병렬 SQL 쿼리를 실행하는 것입니다.
build.sbt에 libraryDependencies에 커넥터와 JDBC를 추가합니다. 내 예제에서 그것을 사용할 것이다, 그래서 난 단지, MySQL과 연동이 시도했지만, 포스트 그레스 훨씬 동일해야합니다.
libraryDependencies ++= Seq( jdbc, "mysql" % "mysql-connector-java" % "5.1.29", "org.apache.spark" %% "spark-core" % "1.0.1", // etc )
당신이 SparkContext을 만들 때 실행 프로그램에 복사 할 수있는 단지를 말한다. 커넥터 항아리를 포함합니다. 잘 생긴 방법은이 작업을 수행합니다 :
val classes = Seq( getClass, // To get the jar with our own code. classOf[mysql.jdbc.Driver] // To get the connector. ) val jars = classes.map(_.getProtectionDomain().getCodeSource().getLocation().getPath()) val conf = new SparkConf().setJars(jars)
이제 스파크는 데이터베이스에 연결할 준비가되어 있습니다. 각 집행 결과 분산 계산을위한 준비가 그래서 쿼리의 일부를 실행합니다.
이를 위해 두 가지 옵션이 있습니다. 오래된 방법은 사용하는 org.apache.spark.rdd.JdbcRDD입니다 :
val rdd = new org.apache.spark.rdd.JdbcRDD( sc, () => { sql.DriverManager.getConnection("jdbc:mysql://mysql.example.com/?user=batman&password=alfred") }, "SELECT * FROM BOOKS WHERE ? <= KEY AND KEY <= ?", 0, 1000, 10, row => row.getString("BOOK_TITLE") )
매개 변수에 대한 설명서를 확인하십시오. 간단히:
아파치 스파크 버전 1.3.0 이후 또 다른 방법은 DataFrame API를 통해 사용할 수 있습니다. 대신 JdbcRDD의 당신은 org.apache.spark.sql.DataFrame을 만들 것입니다 :
val df = sqlContext.load("jdbc", Map( "url" -> "jdbc:mysql://mysql.example.com/?user=batman&password=alfred", "dbtable" -> "BOOKS"))
바로 설정할 수있는 파티션 옵션의 전체 목록 (키 범위와 수를 https://spark.apache.org/docs/1.3.1/sql-programming-guide.html#jdbc-to-other-databases를 참조하십시오 ) JdbcRDD으로 좋아한다.
JdbcRDD 업데이트를 지원하지 않습니다. 하지만 당신은 단순히 foreachPartition에서 그들을 할 수 있습니다.
rdd.foreachPartition { it => val conn = sql.DriverManager.getConnection("jdbc:mysql://mysql.example.com/?user=batman&password=alfred") val del = conn.prepareStatement("DELETE FROM BOOKS WHERE BOOK_TITLE = ?") for (bookTitle <- it) { del.setString(1, bookTitle) del.executeUpdate } }
(이 파티션 당 하나의 연결을 생성합니다. 그이 필요한 경우, 연결 풀을 사용!)
DataFrames는 createJDBCTable 및 insertIntoJDBC 방법을 통해 업데이트를 지원합니다.
from https://stackoverflow.com/questions/24916852/how-can-i-connect-to-a-postgresql-database-into-apache-spark-using-scala by cc-by-sa and MIT license
'SCALA' 카테고리의 다른 글
[SCALA] 몇 가지 선물을 기다리는 방법 (0) | 2019.11.14 |
---|---|
[SCALA] 어떻게 RDD의 내용을 인쇄하려면? (0) | 2019.11.14 |
[SCALA] 스칼라리스트의 말미에 부가 된 요소 (0) | 2019.11.14 |
[SCALA] 스칼라 함수를 정의의이 세 가지 방법의 차이 (0) | 2019.11.14 |
[SCALA] 포스트 그레스 표에 Dataframes UPSERT 불꽃 (0) | 2019.11.14 |