복붙노트

[SCALA] SparkSQL 어떻게 사용자 정의 함수에서 null 값을 처리하는 방법?

SCALA

SparkSQL 어떻게 사용자 정의 함수에서 null 값을 처리하는 방법?

String 형의 한 열에 "X"로 표 1을 감안할 때. I는 "X"에 지정된 날짜 문자열의 정수를 표현하는 항목 "Y"와 함께 표 2를 생성 할.

필수 열 "Y"의 널 값을 유지하는 것이다.

표 1 (Dataframe DF1)

+----------+
|         x|
+----------+
|2015-09-12|
|2015-09-13|
|      null|
|      null|
+----------+
root
 |-- x: string (nullable = true)

표 2 (Dataframe DF2)

+----------+--------+                                                                  
|         x|       y|
+----------+--------+
|      null|    null|
|      null|    null|
|2015-09-12|20150912|
|2015-09-13|20150913|
+----------+--------+
root
 |-- x: string (nullable = true)
 |-- y: integer (nullable = true)

열 "Y"의 해당 칼럼에 "X"의 값으로 변환하기 위해 사용자 정의 기능 (UDF) 일 때 :

val extractDateAsInt = udf[Int, String] (
  (d:String) => d.substring(0, 10)
      .filterNot( "-".toSet)
      .toInt )

와 작품, null 값을 처리하는 것은 불가능합니다.

비록, 내가 좋아하는 뭔가를 할 수

val extractDateAsIntWithNull = udf[Int, String] (
  (d:String) => 
    if (d != null) d.substring(0, 10).filterNot( "-".toSet).toInt 
    else 1 )

나는 (의 int는 null이 될 수 없기 때문에, 물론) UDFS를 통해 null 값을 "생산"할 수있는 방법을 발견했다.

다음 DF2의 생성을위한 현재의 내 용액 (표 2)이다 :

// holds data of table 1  
val df1 = ... 

// filter entries from df1, that are not null
val dfNotNulls = df1.filter(df1("x")
  .isNotNull)
  .withColumn("y", extractDateAsInt(df1("x")))
  .withColumnRenamed("x", "right_x")

// create df2 via a left join on df1 and dfNotNull having 
val df2 = df1.join( dfNotNulls, df1("x") === dfNotNulls("right_x"), "leftouter" ).drop("right_x")

질문 :

코드 발췌

val extractDateAsNullableInt = udf[NullableInt, String] (
  (d:String) => 
    if (d != null) d.substring(0, 10).filterNot( "-".toSet).toInt 
    else null )

해결법

  1. ==============================

    1.이 편리 곳 Optioncomes입니다 :

    이 편리 곳 Optioncomes입니다 :

    val extractDateAsOptionInt = udf((d: String) => d match {
      case null => None
      case s => Some(s.substring(0, 10).filterNot("-".toSet).toInt)
    })
    

    또는 일반적인 경우는 약간 더 안전하게 만들 수 있습니다 :

    import scala.util.Try
    
    val extractDateAsOptionInt = udf((d: String) => Try(
      d.substring(0, 10).filterNot("-".toSet).toInt
    ).toOption)
    

    모든 신용이 아니라이 솔루션을 지적했습니다 드미트리 Selivanov 보낸 간다 (누락?)가 여기에 편집.

    대안은 UDF 외부 널 (null) 처리하는 것입니다 :

    import org.apache.spark.sql.functions.{lit, when}
    import org.apache.spark.sql.types.IntegerType
    
    val extractDateAsInt = udf(
       (d: String) => d.substring(0, 10).filterNot("-".toSet).toInt
    )
    
    df.withColumn("y",
      when($"x".isNull, lit(null))
        .otherwise(extractDateAsInt($"x"))
        .cast(IntegerType)
    )
    
  2. ==============================

    2.스칼라는 실제로 좋은 공장 기능, 옵션 (), 즉이 더욱 간결하게 만들 수있다 :

    스칼라는 실제로 좋은 공장 기능, 옵션 (), 즉이 더욱 간결하게 만들 수있다 :

    val extractDateAsOptionInt = udf((d: String) => 
      Option(d).map(_.substring(0, 10).filterNot("-".toSet).toInt))
    

    내부적으로 옵션의이 방법은 당신의 널 체크를하고 적용 대상 :

    def apply[A](x: A): Option[A] = if (x == null) None else Some(x)
    
  3. ==============================

    3.zero323 @의 좋은 대답으로, 나는 설명 null 값을 처리 가능한 사용자 정의 기능을 가지고, 다음 코드를 만들었습니다. 그것은 다른 사람을 위해 도움이됩니다 희망!

    zero323 @의 좋은 대답으로, 나는 설명 null 값을 처리 가능한 사용자 정의 기능을 가지고, 다음 코드를 만들었습니다. 그것은 다른 사람을 위해 도움이됩니다 희망!

    /**
     * Set of methods to construct [[org.apache.spark.sql.UserDefinedFunction]]s that
     * handle `null` values.
     */
    object NullableFunctions {
    
      import org.apache.spark.sql.functions._
      import scala.reflect.runtime.universe.{TypeTag}
      import org.apache.spark.sql.UserDefinedFunction
    
      /**
       * Given a function A1 => RT, create a [[org.apache.spark.sql.UserDefinedFunction]] such that
       *   * if fnc input is null, None is returned. This will create a null value in the output Spark column.
       *   * if A1 is non null, Some( f(input) will be returned, thus creating f(input) as value in the output column.
       * @param f function from A1 => RT
       * @tparam RT return type
       * @tparam A1 input parameter type
       * @return a [[org.apache.spark.sql.UserDefinedFunction]] with the behaviour describe above
       */
      def nullableUdf[RT: TypeTag, A1: TypeTag](f: Function1[A1, RT]): UserDefinedFunction = {
        udf[Option[RT],A1]( (i: A1) => i match {
          case null => None
          case s => Some(f(i))
        })
      }
    
      /**
       * Given a function A1, A2 => RT, create a [[org.apache.spark.sql.UserDefinedFunction]] such that
       *   * if on of the function input parameters is null, None is returned.
       *     This will create a null value in the output Spark column.
       *   * if both input parameters are non null, Some( f(input) will be returned, thus creating f(input1, input2)
       *     as value in the output column.
       * @param f function from A1 => RT
       * @tparam RT return type
       * @tparam A1 input parameter type
       * @tparam A2 input parameter type
       * @return a [[org.apache.spark.sql.UserDefinedFunction]] with the behaviour describe above
       */
      def nullableUdf[RT: TypeTag, A1: TypeTag, A2: TypeTag](f: Function2[A1, A2, RT]): UserDefinedFunction = {
        udf[Option[RT], A1, A2]( (i1: A1, i2: A2) =>  (i1, i2) match {
          case (null, _) => None
          case (_, null) => None
          case (s1, s2) => Some((f(s1,s2)))
        } )
      }
    }
    
  4. from https://stackoverflow.com/questions/32357164/sparksql-how-to-deal-with-null-values-in-user-defined-function by cc-by-sa and MIT license