복붙노트

[SCALA] 아파치 스파크에 null 값을 포함하면 가입

SCALA

아파치 스파크에 null 값을 포함하면 가입

나는 아파치 스파크 가입에 null 값을 포함 할 것이다. 스파크는 기본적으로 널 (null)로 행을 포함하지 않습니다.

여기에 기본 점화 동작입니다.

val numbersDf = Seq(
  ("123"),
  ("456"),
  (null),
  ("")
).toDF("numbers")

val lettersDf = Seq(
  ("123", "abc"),
  ("456", "def"),
  (null, "zzz"),
  ("", "hhh")
).toDF("numbers", "letters")

val joinedDf = numbersDf.join(lettersDf, Seq("numbers"))

여기 joinedDf.show ()의 출력은 :

+-------+-------+
|numbers|letters|
+-------+-------+
|    123|    abc|
|    456|    def|
|       |    hhh|
+-------+-------+

이것은 내가하고 싶은 출력 :

+-------+-------+
|numbers|letters|
+-------+-------+
|    123|    abc|
|    456|    def|
|       |    hhh|
|   null|    zzz|
+-------+-------+

해결법

  1. ==============================

    1.스파크는 특별한 NULL 안전 항등 연산자를 제공한다 :

    스파크는 특별한 NULL 안전 항등 연산자를 제공한다 :

    numbersDf
      .join(lettersDf, numbersDf("numbers") <=> lettersDf("numbers"))
      .drop(lettersDf("numbers"))
    
    +-------+-------+
    |numbers|letters|
    +-------+-------+
    |    123|    abc|
    |    456|    def|
    |   null|    zzz|
    |       |    hhh|
    +-------+-------+
    

    이전 스파크 1.5과 함께 사용하지 않도록주의하십시오. 1.6 스파크에 앞서 그것은 데카르트 제품 (- 빠른 널 안전에 가입 SPARK-11111)이 필요합니다.

    스파크 2.3.0 이상에서는 당신은 PySpark에 Column.eqNullSafe을 사용할 수 있습니다 :

    numbers_df = sc.parallelize([
        ("123", ), ("456", ), (None, ), ("", )
    ]).toDF(["numbers"])
    
    letters_df = sc.parallelize([
        ("123", "abc"), ("456", "def"), (None, "zzz"), ("", "hhh")
    ]).toDF(["numbers", "letters"])
    
    numbers_df.join(letters_df, numbers_df.numbers.eqNullSafe(letters_df.numbers))
    
    +-------+-------+-------+
    |numbers|numbers|letters|
    +-------+-------+-------+
    |    456|    456|    def|
    |   null|   null|    zzz|
    |       |       |    hhh|
    |    123|    123|    abc|
    +-------+-------+-------+
    

    및 % <=> SparkR에서 %

    numbers_df <- createDataFrame(data.frame(numbers = c("123", "456", NA, "")))
    letters_df <- createDataFrame(data.frame(
      numbers = c("123", "456", NA, ""),
      letters = c("abc", "def", "zzz", "hhh")
    ))
    
    head(join(numbers_df, letters_df, numbers_df$numbers %<=>% letters_df$numbers))
    
      numbers numbers letters
    1     456     456     def
    2    <NA>    <NA>     zzz
    3                     hhh
    4     123     123     abc
    

    SQL (스파크 2.2.0+) 사용할 수와 함께 DISTINCT FROM되지 않습니다 :

    SELECT * FROM numbers JOIN letters 
    ON numbers.numbers IS NOT DISTINCT FROM letters.numbers
    

    이것은뿐만 아니라 DataFrame API를 사용할 수있다 :

    numbersDf.alias("numbers")
      .join(lettersDf.alias("letters"))
      .where("numbers.numbers IS NOT DISTINCT FROM letters.numbers")
    
  2. ==============================

    2.

    val numbers2 = numbersDf.withColumnRenamed("numbers","num1") //rename columns so that we can disambiguate them in the join
    val letters2 = lettersDf.withColumnRenamed("numbers","num2")
    val joinedDf = numbers2.join(letters2, $"num1" === $"num2" || ($"num1".isNull &&  $"num2".isNull) ,"outer")
    joinedDf.select("num1","letters").withColumnRenamed("num1","numbers").show  //rename the columns back to the original names
    
  3. ==============================

    3.K L의 아이디어를 바탕으로, 당신은 열 식을 결합 생성 foldLeft을 사용할 수 있습니다 :

    K L의 아이디어를 바탕으로, 당신은 열 식을 결합 생성 foldLeft을 사용할 수 있습니다 :

    def nullSafeJoin(rightDF: DataFrame, columns: Seq[String], joinType: String)(leftDF: DataFrame): DataFrame = 
    {
    
      val colExpr: Column = leftDF(columns.head) <=> rightDF(columns.head)
      val fullExpr = columns.tail.foldLeft(colExpr) { 
        (colExpr, p) => colExpr && leftDF(p) <=> rightDF(p) 
      }
    
      leftDF.join(rightDF, fullExpr, joinType)
    }
    

    다음,이 함수를 호출 할 수처럼 :

    aDF.transform(nullSafejoin(bDF, columns, joinType))
    
  4. ==============================

    4.가입 연산자의 결과에 널 (null) 행을 포함하는 다음과 같은 방법을 시도해보십시오

    가입 연산자의 결과에 널 (null) 행을 포함하는 다음과 같은 방법을 시도해보십시오

    def nullSafeJoin(leftDF: DataFrame, rightDF: DataFrame, columns: Seq[String], joinType: String): DataFrame = {
    
        var columnsExpr: Column = leftDF(columns.head) <=> rightDF(columns.head)
    
        columns.drop(1).foreach(column => {
            columnsExpr = columnsExpr && (leftDF(column) <=> rightDF(column))
        })
    
        var joinedDF: DataFrame = leftDF.join(rightDF, columnsExpr, joinType)
    
        columns.foreach(column => {
            joinedDF = joinedDF.drop(leftDF(column))
        })
    
        joinedDF
    }
    
  5. from https://stackoverflow.com/questions/41728762/including-null-values-in-an-apache-spark-join by cc-by-sa and MIT license