복붙노트

[HADOOP] JDBC 소스에서 데이터를 마이그레이션 할 때 어떻게 분할을 최적화?

HADOOP

JDBC 소스에서 데이터를 마이그레이션 할 때 어떻게 분할을 최적화?

나는 HDFS에 하이브 테이블에 PostgreSQL을 테이블에 테이블에서 데이터를 이동하려합니다. 이를 위해, 나는 다음과 같은 코드를 함께했다 :

  val conf  = new SparkConf().setAppName("Spark-JDBC").set("spark.executor.heartbeatInterval","120s").set("spark.network.timeout","12000s").set("spark.sql.inMemoryColumnarStorage.compressed", "true").set("spark.sql.orc.filterPushdown","true").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").set("spark.kryoserializer.buffer.max","512m").set("spark.serializer", classOf[org.apache.spark.serializer.KryoSerializer].getName).set("spark.streaming.stopGracefullyOnShutdown","true").set("spark.yarn.driver.memoryOverhead","7168").set("spark.yarn.executor.memoryOverhead","7168").set("spark.sql.shuffle.partitions", "61").set("spark.default.parallelism", "60").set("spark.memory.storageFraction","0.5").set("spark.memory.fraction","0.6").set("spark.memory.offHeap.enabled","true").set("spark.memory.offHeap.size","16g").set("spark.dynamicAllocation.enabled", "false").set("spark.dynamicAllocation.enabled","true").set("spark.shuffle.service.enabled","true")
  val spark = SparkSession.builder().config(conf).master("yarn").enableHiveSupport().config("hive.exec.dynamic.partition", "true").config("hive.exec.dynamic.partition.mode", "nonstrict").getOrCreate()
  def prepareFinalDF(splitColumns:List[String], textList: ListBuffer[String], allColumns:String, dataMapper:Map[String, String], partition_columns:Array[String], spark:SparkSession): DataFrame = {
        val colList                = allColumns.split(",").toList
        val (partCols, npartCols)  = colList.partition(p => partition_columns.contains(p.takeWhile(x => x != ' ')))
        val queryCols              = npartCols.mkString(",") + ", 0 as " + flagCol + "," + partCols.reverse.mkString(",")
        val execQuery              = s"select ${allColumns}, 0 as ${flagCol} from schema.tablename where period_year='2017' and period_num='12'"
        val yearDF                 = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable", s"(${execQuery}) as year2017")
                                                                      .option("user", devUserName).option("password", devPassword)
                                                                      .option("partitionColumn","cast_id")
                                                                      .option("lowerBound", 1).option("upperBound", 100000)
                                                                      .option("numPartitions",70).load()
        val totalCols:List[String] = splitColumns ++ textList
        val cdt                    = new ChangeDataTypes(totalCols, dataMapper)
        hiveDataTypes              = cdt.gpDetails()
        val fc                     = prepareHiveTableSchema(hiveDataTypes, partition_columns)
        val allColsOrdered         = yearDF.columns.diff(partition_columns) ++ partition_columns
        val allCols                = allColsOrdered.map(colname => org.apache.spark.sql.functions.col(colname))
        val resultDF               = yearDF.select(allCols:_*)
        val stringColumns          = resultDF.schema.fields.filter(x => x.dataType == StringType).map(s => s.name)
        val finalDF                = stringColumns.foldLeft(resultDF) {
          (tempDF, colName) => tempDF.withColumn(colName, regexp_replace(regexp_replace(col(colName), "[\r\n]+", " "), "[\t]+"," "))
        }
        finalDF
  }
    val dataDF = prepareFinalDF(splitColumns, textList, allColumns, dataMapper, partition_columns, spark)
    val dataDFPart = dataDF.repartition(30)
    dataDFPart.createOrReplaceTempView("preparedDF")
    spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
    spark.sql("set hive.exec.dynamic.partition=true")
    spark.sql(s"INSERT OVERWRITE TABLE schema.hivetable PARTITION(${prtn_String_columns}) select * from preparedDF")

데이터는 동적 prtn_String_columns 기반으로 구획 된 하이브 테이블에 삽입된다 source_system_name, period_year, period_num

사용 스파크 제출 :

SPARK_MAJOR_VERSION=2 spark-submit --conf spark.ui.port=4090 --driver-class-path /home/fdlhdpetl/jars/postgresql-42.1.4.jar  --jars /home/fdlhdpetl/jars/postgresql-42.1.4.jar --num-executors 80 --executor-cores 5 --executor-memory 50G --driver-memory 20G --driver-cores 3 --class com.partition.source.YearPartition splinter_2.11-0.1.jar --master=yarn --deploy-mode=cluster --keytab /home/fdlhdpetl/fdlhdpetl.keytab --principal fdlhdpetl@FDLDEV.COM --files /usr/hdp/current/spark2-client/conf/hive-site.xml,testconnection.properties --name Splinter --conf spark.executor.extraClassPath=/home/fdlhdpetl/jars/postgresql-42.1.4.jar

다음과 같은 오류 메시지가 실행 프로그램 로그에 생성됩니다

Container exited with a non-zero exit code 143.
Killed by external signal
18/10/03 15:37:24 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[SIGTERM handler,9,system]
java.lang.OutOfMemoryError: Java heap space
    at java.util.zip.InflaterInputStream.<init>(InflaterInputStream.java:88)
    at java.util.zip.ZipFile$ZipFileInflaterInputStream.<init>(ZipFile.java:393)
    at java.util.zip.ZipFile.getInputStream(ZipFile.java:374)
    at java.util.jar.JarFile.getManifestFromReference(JarFile.java:199)
    at java.util.jar.JarFile.getManifest(JarFile.java:180)
    at sun.misc.URLClassPath$JarLoader$2.getManifest(URLClassPath.java:944)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:450)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at org.apache.spark.util.SignalUtils$ActionHandler.handle(SignalUtils.scala:99)
    at sun.misc.Signal$1.run(Signal.java:212)
    at java.lang.Thread.run(Thread.java:745)

나는 읽기는 다음과 같이 파티션의 주어진 숫자와 제대로 실행되고있는 로그에서 볼 수 :

Scan JDBCRelation((select column_names from schema.tablename where period_year='2017' and period_num='12') as year2017) [numPartitions=50]

다음 단계에서 집행의 상태는 다음과 같습니다

데이터는 적절히 분배되는 것은 아니다. 다른 하나는 거대한되고있는 동안 하나 개의 파티션은 작다. 스큐 문제는 여기에있다. 하이브 테이블에 데이터를 삽입하는 동안 작업이 줄에서 실패 spark.sql (들 "INSERT 덮어 쓰기 표 schema.hivetable 파티션 ($ {prtn_String_columns})를 선택 * preparedDF에서")하지만이 때문에 데이터 스큐의 일이 일어나고 이해 문제.

난 그냥 csv 파일로 저장하려고 대신 하이브 테이블에 dataframe를 저장하지만 아무것도 예외를주는에서 실행에 영향을주지 않습니다, 집행자 메모리, 드라이버 메모리를 증가 집행의 수를 늘리기 위해 노력했다 :

java.lang.OutOfMemoryError: GC overhead limit exceeded

내가 수정하는 데 필요한 코드가 있나요? 사람이 제가이 문제를 해결할 수있는 방법을 알려 주시겠습니까?

해결법

  1. ==============================

    1.* pseudocolumns를 사용하지 마십시오 - 의사 열을 점화 JDBC에.

    * pseudocolumns를 사용하지 마십시오 - 의사 열을 점화 JDBC에.

  2. ==============================

    2.내 경험에 차이를 만들어 메모리 설정 4 종류가있다 :

    내 경험에 차이를 만들어 메모리 설정 4 종류가있다 :

    프로그램 스택을 유지하기위한 처리 이유로 VS [2] 힙 공간 데이터를 저장하기위한 A) [1] 메모리

    B) [1] 드라이버 VS [2]를 실행 프로그램 메모리

    지금까지, 나는 항상 메모리의 적절한 종류를 증가시켜 성공적으로 실행 내 스파크 작업을 얻을 수 있었다 :

    A2-B1 따라서 프로그램 스택을 보유 할 드라이버에서 사용할 수있는 메모리 될 것이다. 기타.

    다음과 같이 속성 이름은 다음과 같습니다 :

    A1-B1) 실행 프로그램 메모리

    A1-B2) 드라이버 메모리

    A2-B1) spark.yarn.executor.memoryOverhead

    A2-B2) spark.yarn.driver.memoryOverhead

    모두의 합이 * -B1은 * -B2이 드라이버 노드의 메모리보다 작아야합니다 당신의 노동자의 사용 가능한 메모리와 모든의 합보다 작아야합니다 있음을 유의하십시오.

    내 좋은 방법은 범인이 대담하게 표시 힙 설정 중 하나입니다 될 것입니다.

  3. ==============================

    3.중복으로 여기 라우팅 당신의 또 다른 질문이 있었다

    중복으로 여기 라우팅 당신의 또 다른 질문이 있었다

     'How to avoid data skewing while reading huge datasets or tables into spark? 
      The data is not being partitioned properly. One partition is smaller while the 
      other one becomes huge on read.
      I observed that one of the partition has nearly 2million rows and 
      while inserting there is a skew in partition. '
    

    문제는, 읽기 후 dataframe에 분할 데이터를 처리하는 경우에는 "numPartitions"값을 증가 놀았 있나요?

    .option("numPartitions",50)
    

    LOWERBOUND 생성 WHERE 절 식과 numpartitions위한 UPPERBOUND 형태 파티션 진보 분할 수를 결정한다.

    예를 들어 말, sometable 열이 - ID를 (우리는 partitionColumn로 선택)를; 열-ID가 1000-1 출신에 대한 값의 범위는 우리가 테이블에보고 우리는 * sometable에서 선택 실행하여 모든 레코드를 얻으려면 그래서 우리는 LOWERBOUND = 1 UPPERBOUND = 1000 numpartition = 4 것

    이것은 우리의 공급에 기초 SQL을 구축하여 각 쿼리의 결과 4 분할의 dataframe를 생성한다 (LOWERBOUND = 1 UPPERBOUND = 1000 numpartition = 4)

    select * from sometable where ID < 250
    select * from sometable where ID >= 250 and ID < 500
    select * from sometable where ID >= 500 and ID < 750
    select * from sometable where ID >= 750
    

    우리 테이블의 레코드의 대부분은 ID의 범위 (500750) 내에서 무엇을 가을합니다. 그건 당신이에에있는 상황입니다.

    우리가 numpartition을 증가 할 때, 분할은 더욱 발생하고이를 같은 파티션에 기록의 양을 줄일 수 있지만, 좋은 샷이 아닙니다.

    대신 당신이 그렇게 자신에 의해 분할을 공급 생각하면 우리가 제공 경계를 기반으로 partitioncolumn을 분할 스파크의 데이터를 균일하게 할 수있다 스플릿. 다른 JDBC 방식으로 전환 할 경우 대신 (LOWERBOUND, 어느 상한선 및 numpartition은) 우리가 제공 할 수있는 필요 직접 술어.

    def jdbc(url: String, table: String, predicates: Array[String], connectionProperties: Properties): DataFrame 
    

    링크

  4. from https://stackoverflow.com/questions/52591982/what-is-the-best-strategy-to-load-huge-datasets-data-into-hive-tables-using-spar by cc-by-sa and MIT license