복붙노트

[SCALA] 스파크에서 두 개 이상의 DataFrame을 압축하는 방법

SCALA

스파크에서 두 개 이상의 DataFrame을 압축하는 방법

나는 두 DataFrame A와 B가있다. a는 같다

Column 1 | Column 2
abc      |  123
cde      |  23 

B는 같다

Column 1 
1      
2      

나는 a와 b (또는 그 이상) 같은 것을하게 DataFrames을 압축 할 :

Column 1 | Column 2 | Column 3
abc      |  123     |   1
cde      |  23      |   2

내가 어떻게 해?

해결법

  1. ==============================

    1.이 같은 작업은 DataFrame의 API를 지원하지 않습니다. 두 RDDs를 압축 할 수 있지만, 당신은 파티션 당 요소의 두 파티션의 수와 수와 일치해야 작동하게 할 수 있습니다. 이 경우 가정 :

    이 같은 작업은 DataFrame의 API를 지원하지 않습니다. 두 RDDs를 압축 할 수 있지만, 당신은 파티션 당 요소의 두 파티션의 수와 수와 일치해야 작동하게 할 수 있습니다. 이 경우 가정 :

    import org.apache.spark.sql.DataFrame
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.types.{StructField, StructType, LongType}
    
    val a: DataFrame = sc.parallelize(Seq(
      ("abc", 123), ("cde", 23))).toDF("column_1", "column_2")
    val b: DataFrame = sc.parallelize(Seq(Tuple1(1), Tuple1(2))).toDF("column_3")
    
    // Merge rows
    val rows = a.rdd.zip(b.rdd).map{
      case (rowLeft, rowRight) => Row.fromSeq(rowLeft.toSeq ++ rowRight.toSeq)}
    
    // Merge schemas
    val schema = StructType(a.schema.fields ++ b.schema.fields)
    
    // Create new data frame
    val ab: DataFrame = sqlContext.createDataFrame(rows, schema)
    

    위의 조건은 인덱스와 조인을 추가하고 마음에 오는 유일한 옵션을 충족하지 않는 경우 :

    def addIndex(df: DataFrame) = sqlContext.createDataFrame(
      // Add index
      df.rdd.zipWithIndex.map{case (r, i) => Row.fromSeq(r.toSeq :+ i)},
      // Create schema
      StructType(df.schema.fields :+ StructField("_index", LongType, false))
    )
    
    // Add indices
    val aWithIndex = addIndex(a)
    val bWithIndex = addIndex(b)
    
    // Join and clean
    val ab = aWithIndex
      .join(bWithIndex, Seq("_index"))
      .drop("_index")
    
  2. ==============================

    2.Dataframes의 스칼라의 구현에서는, 하나에 두 개의 dataframes을 연결하는 간단한 방법이 없습니다. 우리는 단순히 dataframes의 각 행에 인덱스를 추가하여이 문제를 해결 할 수 있습니다. 그런 다음, 우리는 않는 내부 이러한 인덱스에 의해 가입 할 수 있습니다. 이것은이 구현의 내 스텁 코드입니다 :

    Dataframes의 스칼라의 구현에서는, 하나에 두 개의 dataframes을 연결하는 간단한 방법이 없습니다. 우리는 단순히 dataframes의 각 행에 인덱스를 추가하여이 문제를 해결 할 수 있습니다. 그런 다음, 우리는 않는 내부 이러한 인덱스에 의해 가입 할 수 있습니다. 이것은이 구현의 내 스텁 코드입니다 :

    val a: DataFrame = sc.parallelize(Seq(("abc", 123), ("cde", 23))).toDF("column_1", "column_2")
    val aWithId: DataFrame = a.withColumn("id",monotonicallyIncreasingId)
    
    val b: DataFrame = sc.parallelize(Seq((1), (2))).toDF("column_3")
    val bWithId: DataFrame = b.withColumn("id",monotonicallyIncreasingId)
    
    aWithId.join(bWithId, "id")
    

    약간 가벼운 독서 - 파이썬은이 작업을 수행하는 방법을 확인하세요!

  3. ==============================

    3.무엇 순수 SQL에 대한?

    무엇 순수 SQL에 대한?

    SELECT 
        room_name, 
        sender_nickname, 
        message_id, 
        row_number() over (partition by room_name order by message_id) as message_index, 
        row_number() over (partition by room_name, sender_nickname order by message_id) as user_message_index
    from messages
    order by room_name, message_id
    
  4. ==============================

    4.나는 영업 이익은 스칼라를 사용하고 있었다 알고 있지만 나처럼, 당신은 pyspark에서이 작업을 수행하는 방법을 알아야이라면 아래의 파이썬 코드를 사용해보십시오. @ zero323 최초의 솔루션처럼 그것은 RDD.zip (의존) 모두 DataFrames 파티션 같은 수의 각 파티션에서 같은 수의 행이없는 경우에 따라서 실패합니다.

    나는 영업 이익은 스칼라를 사용하고 있었다 알고 있지만 나처럼, 당신은 pyspark에서이 작업을 수행하는 방법을 알아야이라면 아래의 파이썬 코드를 사용해보십시오. @ zero323 최초의 솔루션처럼 그것은 RDD.zip (의존) 모두 DataFrames 파티션 같은 수의 각 파티션에서 같은 수의 행이없는 경우에 따라서 실패합니다.

    from pyspark.sql import Row
    from pyspark.sql.types import StructType
    
    def zipDataFrames(left, right):
        CombinedRow = Row(*left.columns + right.columns)
    
        def flattenRow(row):
            left = row[0]
            right = row[1]
            combinedVals = [left[col] for col in left.__fields__] + [right[col] for col in right.__fields__]
            return CombinedRow(*combinedVals)
    
        zippedRdd = left.rdd.zip(right.rdd).map(lambda row: flattenRow(row))        
        combinedSchema = StructType(left.schema.fields + right.schema.fields)        
        return zippedRdd.toDF(combinedSchema)
    
    joined = zipDataFrames(a, b)
    
  5. from https://stackoverflow.com/questions/32882529/how-to-zip-two-or-more-dataframe-in-spark by cc-by-sa and MIT license