[HADOOP] Apache Spark의 기본 키
HADOOPApache Spark의 기본 키
Apache Spark 및 PostgreSQL과 JDBC 연결을하고 데이터베이스에 데이터를 삽입하려고합니다. 추가 모드를 사용할 때 각 DataFrame.Row에 id를 지정해야합니다. Spark에서 기본 키를 만들 수있는 방법이 있습니까?
해결법
-
==============================
1.규모 :
규모 :
고유 번호가 필요한 경우 zipWithUniqueId를 사용하여 DataFrame을 다시 만들 수 있습니다. 먼저 수입 및 더미 자료 :
import sqlContext.implicits._ import org.apache.spark.sql.Row import org.apache.spark.sql.types.{StructType, StructField, LongType} val df = sc.parallelize(Seq( ("a", -1.0), ("b", -2.0), ("c", -3.0))).toDF("foo", "bar")
추가 사용을 위해 스키마 추출 :
val schema = df.schema
ID 필드 추가 :
val rows = df.rdd.zipWithUniqueId.map{ case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq)}
DataFrame 만들기 :
val dfWithPK = sqlContext.createDataFrame( rows, StructType(StructField("id", LongType, false) +: schema.fields))
파이썬에서의 동일한 일 :
from pyspark.sql import Row from pyspark.sql.types import StructField, StructType, LongType row = Row("foo", "bar") row_with_index = Row(*["id"] + df.columns) df = sc.parallelize([row("a", -1.0), row("b", -2.0), row("c", -3.0)]).toDF() def make_row(columns): def _make_row(row, uid): row_dict = row.asDict() return row_with_index(*[uid] + [row_dict.get(c) for c in columns]) return _make_row f = make_row(df.columns) df_with_pk = (df.rdd .zipWithUniqueId() .map(lambda x: f(*x)) .toDF(StructType([StructField("id", LongType(), False)] + df.schema.fields)))
연속 번호를 선호하면 zipWithUniqueId를 zipWithIndex로 대체 할 수 있지만 조금 더 비쌉니다.
DataFrame API 직접 사용 :
(보편적 인 스칼라, 파이썬, 자바, R과 거의 같은 문법)
이전에 monotonicallyIncreasingId 함수를 놓쳤습니다.이 함수는 연속적인 숫자가 필요하지 않는 한 잘 작동합니다.
import org.apache.spark.sql.functions.monotonicallyIncreasingId df.withColumn("id", monotonicallyIncreasingId).show() // +---+----+-----------+ // |foo| bar| id| // +---+----+-----------+ // | a|-1.0|17179869184| // | b|-2.0|42949672960| // | c|-3.0|60129542144| // +---+----+-----------+
유용한 monotonicallyIncreasingId는 비 결정적입니다. ID는 실행마다 다를 수 있지만 후속 작업에 필터가 포함될 때 행을 식별하는 데 추가 트릭을 사용할 수 없습니다.
노트 :
rowNumber 윈도우 함수를 사용할 수도 있습니다 :
from pyspark.sql.window import Window from pyspark.sql.functions import rowNumber w = Window().orderBy() df.withColumn("id", rowNumber().over(w)).show()
운수 나쁘게:
따라서 데이터를 분할하고 고유성이 현재로서는 특히 유용하지 않다는 것을 보장하는 자연스러운 방법이 없다면 말입니다.
-
==============================
2.
from pyspark.sql.functions import monotonically_increasing_id df.withColumn("id", monotonically_increasing_id()).show()
df.withColumn의 두 번째 인수는 monotonically_increasing_id가 아닌 monotonically_increasing_id ()입니다.
-
==============================
3.zipWithIndex ()가 원하는 동작 인 경우, 즉 연속 된 정수가 필요한 경우에 대해 다음과 같은 솔루션이 비교적 간단하다는 것을 알았습니다.
zipWithIndex ()가 원하는 동작 인 경우, 즉 연속 된 정수가 필요한 경우에 대해 다음과 같은 솔루션이 비교적 간단하다는 것을 알았습니다.
이 경우, 우리는 pyspark를 사용하고 독해력에 의지하여 원본 행 객체를 고유 색인을 포함하는 새로운 스키마에 맞는 새로운 사전에 매핑합니다.
# read the initial dataframe without index dfNoIndex = sqlContext.read.parquet(dataframePath) # Need to zip together with a unique integer # First create a new schema with uuid field appended newSchema = StructType([StructField("uuid", IntegerType(), False)] + dfNoIndex.schema.fields) # zip with the index, map it to a dictionary which includes new field df = dfNoIndex.rdd.zipWithIndex()\ .map(lambda (row, id): {k:v for k, v in row.asDict().items() + [("uuid", id)]})\ .toDF(newSchema)
from https://stackoverflow.com/questions/33102727/primary-keys-with-apache-spark by cc-by-sa and MIT license
'HADOOP' 카테고리의 다른 글
[HADOOP] 하이브 : 주 테이블에서 증분 업데이트를 수행하는 가장 좋은 방법 (0) | 2019.05.26 |
---|---|
[HADOOP] 축소 단계 후에 출력 파일 병합 (0) | 2019.05.26 |
[HADOOP] Spark - CSV 파일을 DataFrame으로로드 하시겠습니까? (0) | 2019.05.26 |
[HADOOP] Hadoop 프로세스 레코드는 블록 경계에서 어떻게 분리됩니까? (0) | 2019.05.26 |
[HADOOP] 키로 여러 출력에 쓰기 Spark - 하나의 Spark 작업 (0) | 2019.05.26 |