[SCALA] 스파크에서 두 개 이상의 DataFrame을 압축하는 방법
SCALA스파크에서 두 개 이상의 DataFrame을 압축하는 방법
나는 두 DataFrame A와 B가있다. a는 같다
Column 1 | Column 2
abc | 123
cde | 23
B는 같다
Column 1
1
2
나는 a와 b (또는 그 이상) 같은 것을하게 DataFrames을 압축 할 :
Column 1 | Column 2 | Column 3
abc | 123 | 1
cde | 23 | 2
내가 어떻게 해?
해결법
-
==============================
1.이 같은 작업은 DataFrame의 API를 지원하지 않습니다. 두 RDDs를 압축 할 수 있지만, 당신은 파티션 당 요소의 두 파티션의 수와 수와 일치해야 작동하게 할 수 있습니다. 이 경우 가정 :
이 같은 작업은 DataFrame의 API를 지원하지 않습니다. 두 RDDs를 압축 할 수 있지만, 당신은 파티션 당 요소의 두 파티션의 수와 수와 일치해야 작동하게 할 수 있습니다. 이 경우 가정 :
import org.apache.spark.sql.DataFrame import org.apache.spark.sql.Row import org.apache.spark.sql.types.{StructField, StructType, LongType} val a: DataFrame = sc.parallelize(Seq( ("abc", 123), ("cde", 23))).toDF("column_1", "column_2") val b: DataFrame = sc.parallelize(Seq(Tuple1(1), Tuple1(2))).toDF("column_3") // Merge rows val rows = a.rdd.zip(b.rdd).map{ case (rowLeft, rowRight) => Row.fromSeq(rowLeft.toSeq ++ rowRight.toSeq)} // Merge schemas val schema = StructType(a.schema.fields ++ b.schema.fields) // Create new data frame val ab: DataFrame = sqlContext.createDataFrame(rows, schema)
위의 조건은 인덱스와 조인을 추가하고 마음에 오는 유일한 옵션을 충족하지 않는 경우 :
def addIndex(df: DataFrame) = sqlContext.createDataFrame( // Add index df.rdd.zipWithIndex.map{case (r, i) => Row.fromSeq(r.toSeq :+ i)}, // Create schema StructType(df.schema.fields :+ StructField("_index", LongType, false)) ) // Add indices val aWithIndex = addIndex(a) val bWithIndex = addIndex(b) // Join and clean val ab = aWithIndex .join(bWithIndex, Seq("_index")) .drop("_index")
-
==============================
2.Dataframes의 스칼라의 구현에서는, 하나에 두 개의 dataframes을 연결하는 간단한 방법이 없습니다. 우리는 단순히 dataframes의 각 행에 인덱스를 추가하여이 문제를 해결 할 수 있습니다. 그런 다음, 우리는 않는 내부 이러한 인덱스에 의해 가입 할 수 있습니다. 이것은이 구현의 내 스텁 코드입니다 :
Dataframes의 스칼라의 구현에서는, 하나에 두 개의 dataframes을 연결하는 간단한 방법이 없습니다. 우리는 단순히 dataframes의 각 행에 인덱스를 추가하여이 문제를 해결 할 수 있습니다. 그런 다음, 우리는 않는 내부 이러한 인덱스에 의해 가입 할 수 있습니다. 이것은이 구현의 내 스텁 코드입니다 :
val a: DataFrame = sc.parallelize(Seq(("abc", 123), ("cde", 23))).toDF("column_1", "column_2") val aWithId: DataFrame = a.withColumn("id",monotonicallyIncreasingId) val b: DataFrame = sc.parallelize(Seq((1), (2))).toDF("column_3") val bWithId: DataFrame = b.withColumn("id",monotonicallyIncreasingId) aWithId.join(bWithId, "id")
약간 가벼운 독서 - 파이썬은이 작업을 수행하는 방법을 확인하세요!
-
==============================
3.무엇 순수 SQL에 대한?
무엇 순수 SQL에 대한?
SELECT room_name, sender_nickname, message_id, row_number() over (partition by room_name order by message_id) as message_index, row_number() over (partition by room_name, sender_nickname order by message_id) as user_message_index from messages order by room_name, message_id
-
==============================
4.나는 영업 이익은 스칼라를 사용하고 있었다 알고 있지만 나처럼, 당신은 pyspark에서이 작업을 수행하는 방법을 알아야이라면 아래의 파이썬 코드를 사용해보십시오. @ zero323 최초의 솔루션처럼 그것은 RDD.zip (의존) 모두 DataFrames 파티션 같은 수의 각 파티션에서 같은 수의 행이없는 경우에 따라서 실패합니다.
나는 영업 이익은 스칼라를 사용하고 있었다 알고 있지만 나처럼, 당신은 pyspark에서이 작업을 수행하는 방법을 알아야이라면 아래의 파이썬 코드를 사용해보십시오. @ zero323 최초의 솔루션처럼 그것은 RDD.zip (의존) 모두 DataFrames 파티션 같은 수의 각 파티션에서 같은 수의 행이없는 경우에 따라서 실패합니다.
from pyspark.sql import Row from pyspark.sql.types import StructType def zipDataFrames(left, right): CombinedRow = Row(*left.columns + right.columns) def flattenRow(row): left = row[0] right = row[1] combinedVals = [left[col] for col in left.__fields__] + [right[col] for col in right.__fields__] return CombinedRow(*combinedVals) zippedRdd = left.rdd.zip(right.rdd).map(lambda row: flattenRow(row)) combinedSchema = StructType(left.schema.fields + right.schema.fields) return zippedRdd.toDF(combinedSchema) joined = zipDataFrames(a, b)
from https://stackoverflow.com/questions/32882529/how-to-zip-two-or-more-dataframe-in-spark by cc-by-sa and MIT license
'SCALA' 카테고리의 다른 글
[SCALA] bufferSchema 성능 문제로 ArrayType와 UDAF 불꽃 (0) | 2019.11.03 |
---|---|
[SCALA] 컴파일 자바 7을 사용하는 SBT 설정? (0) | 2019.11.03 |
[SCALA] 스파크 DataFrame가 열이있는 경우 어떻게 감지 할 (0) | 2019.11.03 |
[SCALA] 스칼라의 동적 믹스 인 - 그것이 가능할까요? (0) | 2019.11.03 |
[SCALA] JavaFX는 완전히 창을 사용자 정의? (0) | 2019.11.03 |