복붙노트

[SCALA] 어떻게 스칼라를 사용하여 아파치 스파크에 PostgreSQL 데이터베이스에 연결할 수 있습니까?

SCALA

어떻게 스칼라를 사용하여 아파치 스파크에 PostgreSQL 데이터베이스에 연결할 수 있습니까?

나는 스칼라에 다음과 같은 일을 할 수있는 방법을 알고 싶어?

나는 어떻게 포장 동안 SBT에 psql의 스칼라의 커넥터 항아리를 가져 오는 스칼라하지만를 사용하여 해야할지?

해결법

  1. ==============================

    1.우리의 목표는 스파크 노동자 병렬 SQL 쿼리를 실행하는 것입니다.

    우리의 목표는 스파크 노동자 병렬 SQL 쿼리를 실행하는 것입니다.

    build.sbt에 libraryDependencies에 커넥터와 JDBC를 추가합니다. 내 예제에서 그것을 사용할 것이다, 그래서 난 단지, MySQL과 연동이 시도했지만, 포스트 그레스 훨씬 동일해야합니다.

    libraryDependencies ++= Seq(
      jdbc,
      "mysql" % "mysql-connector-java" % "5.1.29",
      "org.apache.spark" %% "spark-core" % "1.0.1",
      // etc
    )
    

    당신이 SparkContext을 만들 때 실행 프로그램에 복사 할 수있는 단지를 말한다. 커넥터 항아리를 포함합니다. 잘 생긴 방법은이 작업을 수행합니다 :

    val classes = Seq(
      getClass,                   // To get the jar with our own code.
      classOf[mysql.jdbc.Driver]  // To get the connector.
    )
    val jars = classes.map(_.getProtectionDomain().getCodeSource().getLocation().getPath())
    val conf = new SparkConf().setJars(jars)
    

    이제 스파크는 데이터베이스에 연결할 준비가되어 있습니다. 각 집행 결과 분산 계산을위한 준비가 그래서 쿼리의 일부를 실행합니다.

    이를 위해 두 가지 옵션이 있습니다. 오래된 방법은 사용하는 org.apache.spark.rdd.JdbcRDD입니다 :

    val rdd = new org.apache.spark.rdd.JdbcRDD(
      sc,
      () => {
        sql.DriverManager.getConnection("jdbc:mysql://mysql.example.com/?user=batman&password=alfred")
      },
      "SELECT * FROM BOOKS WHERE ? <= KEY AND KEY <= ?",
      0, 1000, 10,
      row => row.getString("BOOK_TITLE")
    )
    

    매개 변수에 대한 설명서를 확인하십시오. 간단히:

    아파치 스파크 버전 1.3.0 이후 또 다른 방법은 DataFrame API를 통해 사용할 수 있습니다. 대신 JdbcRDD의 당신은 org.apache.spark.sql.DataFrame을 만들 것입니다 :

    val df = sqlContext.load("jdbc", Map(
      "url" -> "jdbc:mysql://mysql.example.com/?user=batman&password=alfred",
      "dbtable" -> "BOOKS"))
    

    바로 설정할 수있는 파티션 옵션의 전체 목록 (키 범위와 수를 https://spark.apache.org/docs/1.3.1/sql-programming-guide.html#jdbc-to-other-databases를 참조하십시오 ) JdbcRDD으로 좋아한다.

    JdbcRDD 업데이트를 지원하지 않습니다. 하지만 당신은 단순히 foreachPartition에서 그들을 할 수 있습니다.

    rdd.foreachPartition { it =>
      val conn = sql.DriverManager.getConnection("jdbc:mysql://mysql.example.com/?user=batman&password=alfred")
      val del = conn.prepareStatement("DELETE FROM BOOKS WHERE BOOK_TITLE = ?")
      for (bookTitle <- it) {
        del.setString(1, bookTitle)
        del.executeUpdate
      }
    }
    

    (이 파티션 당 하나의 연결을 생성합니다. 그이 필요한 경우, 연결 풀을 사용!)

    DataFrames는 createJDBCTable 및 insertIntoJDBC 방법을 통해 업데이트를 지원합니다.

  2. from https://stackoverflow.com/questions/24916852/how-can-i-connect-to-a-postgresql-database-into-apache-spark-using-scala by cc-by-sa and MIT license