복붙노트

[HADOOP] Apache Spark의 기본 키

HADOOP

Apache Spark의 기본 키

Apache Spark 및 PostgreSQL과 JDBC 연결을하고 데이터베이스에 데이터를 삽입하려고합니다. 추가 모드를 사용할 때 각 DataFrame.Row에 id를 지정해야합니다. Spark에서 기본 키를 만들 수있는 방법이 있습니까?

해결법

  1. ==============================

    1.규모 :

    규모 :

    고유 번호가 필요한 경우 zipWithUniqueId를 사용하여 DataFrame을 다시 만들 수 있습니다. 먼저 수입 및 더미 자료 :

    import sqlContext.implicits._
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.types.{StructType, StructField, LongType}
    
    val df = sc.parallelize(Seq(
        ("a", -1.0), ("b", -2.0), ("c", -3.0))).toDF("foo", "bar")
    

    추가 사용을 위해 스키마 추출 :

    val schema = df.schema
    

    ID 필드 추가 :

    val rows = df.rdd.zipWithUniqueId.map{
       case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq)}
    

    DataFrame 만들기 :

    val dfWithPK = sqlContext.createDataFrame(
      rows, StructType(StructField("id", LongType, false) +: schema.fields))
    

    파이썬에서의 동일한 일 :

    from pyspark.sql import Row
    from pyspark.sql.types import StructField, StructType, LongType
    
    row = Row("foo", "bar")
    row_with_index = Row(*["id"] + df.columns)
    
    df = sc.parallelize([row("a", -1.0), row("b", -2.0), row("c", -3.0)]).toDF()
    
    def make_row(columns):
        def _make_row(row, uid):
            row_dict = row.asDict()
            return row_with_index(*[uid] + [row_dict.get(c) for c in columns])
        return _make_row
    
    f = make_row(df.columns)
    
    df_with_pk = (df.rdd
        .zipWithUniqueId()
        .map(lambda x: f(*x))
        .toDF(StructType([StructField("id", LongType(), False)] + df.schema.fields)))
    

    연속 번호를 선호하면 zipWithUniqueId를 zipWithIndex로 대체 할 수 있지만 조금 더 비쌉니다.

    DataFrame API 직접 사용 :

    (보편적 인 스칼라, 파이썬, 자바, R과 거의 같은 문법)

    이전에 monotonicallyIncreasingId 함수를 놓쳤습니다.이 함수는 연속적인 숫자가 필요하지 않는 한 잘 작동합니다.

    import org.apache.spark.sql.functions.monotonicallyIncreasingId
    
    df.withColumn("id", monotonicallyIncreasingId).show()
    // +---+----+-----------+
    // |foo| bar|         id|
    // +---+----+-----------+
    // |  a|-1.0|17179869184|
    // |  b|-2.0|42949672960|
    // |  c|-3.0|60129542144|
    // +---+----+-----------+
    

    유용한 monotonicallyIncreasingId는 비 결정적입니다. ID는 실행마다 다를 수 있지만 후속 작업에 필터가 포함될 때 행을 식별하는 데 추가 트릭을 사용할 수 없습니다.

    노트 :

    rowNumber 윈도우 함수를 사용할 수도 있습니다 :

    from pyspark.sql.window import Window
    from pyspark.sql.functions import rowNumber
    
    w = Window().orderBy()
    df.withColumn("id", rowNumber().over(w)).show()
    

    운수 나쁘게:

    따라서 데이터를 분할하고 고유성이 현재로서는 특히 유용하지 않다는 것을 보장하는 자연스러운 방법이 없다면 말입니다.

  2. ==============================

    2.

    from pyspark.sql.functions import monotonically_increasing_id
    
    df.withColumn("id", monotonically_increasing_id()).show()
    

    df.withColumn의 두 번째 인수는 monotonically_increasing_id가 아닌 monotonically_increasing_id ()입니다.

  3. ==============================

    3.zipWithIndex ()가 원하는 동작 인 경우, 즉 연속 된 정수가 필요한 경우에 대해 다음과 같은 솔루션이 비교적 간단하다는 것을 알았습니다.

    zipWithIndex ()가 원하는 동작 인 경우, 즉 연속 된 정수가 필요한 경우에 대해 다음과 같은 솔루션이 비교적 간단하다는 것을 알았습니다.

    이 경우, 우리는 pyspark를 사용하고 독해력에 의지하여 원본 행 객체를 고유 색인을 포함하는 새로운 스키마에 맞는 새로운 사전에 매핑합니다.

    # read the initial dataframe without index
    dfNoIndex = sqlContext.read.parquet(dataframePath)
    # Need to zip together with a unique integer
    
    # First create a new schema with uuid field appended
    newSchema = StructType([StructField("uuid", IntegerType(), False)]
                           + dfNoIndex.schema.fields)
    # zip with the index, map it to a dictionary which includes new field
    df = dfNoIndex.rdd.zipWithIndex()\
                          .map(lambda (row, id): {k:v
                                                  for k, v
                                                  in row.asDict().items() + [("uuid", id)]})\
                          .toDF(newSchema)
    
  4. from https://stackoverflow.com/questions/33102727/primary-keys-with-apache-spark by cc-by-sa and MIT license