복붙노트

[SCALA] 어떻게 스파크 SQL에서 UDF를에 추가 매개 변수를 전달할 수 있습니다?

SCALA

어떻게 스파크 SQL에서 UDF를에 추가 매개 변수를 전달할 수 있습니다?

I는 DataFrame에서 날짜 열 분석하려는 각각의 날짜 열을위한 날짜의 해상도를 변경할 수있다 (즉 2011년 1월 10일 해상도가 "월"로하면 => 2,011 / 01).

나는 다음과 같은 코드를 작성 :

def convertDataFrame(dataframe: DataFrame, schema : Array[FieldDataType], resolution: Array[DateResolutionType]) : DataFrame =
{
  import org.apache.spark.sql.functions._
  val convertDateFunc = udf{(x:String, resolution: DateResolutionType) => SparkDateTimeConverter.convertDate(x, resolution)}
  val convertDateTimeFunc = udf{(x:String, resolution: DateResolutionType) => SparkDateTimeConverter.convertDateTime(x, resolution)}

  val allColNames = dataframe.columns
  val allCols = allColNames.map(name => dataframe.col(name))

  val mappedCols =
  {
    for(i <- allCols.indices) yield
    {
      schema(i) match
      {
        case FieldDataType.Date => convertDateFunc(allCols(i), resolution(i)))
        case FieldDataType.DateTime => convertDateTimeFunc(allCols(i), resolution(i))
        case _ => allCols(i)
      }
    }
  }

  dataframe.select(mappedCols:_*)

}}

그러나이 작동하지 않습니다. 내가 UDF 만에 열을 전달할 수있는 것 같다. 그것이 내가 RDD에 DataFrame을 변환 할 경우 매우 느려질 수 및 각 행에 기능을 적용한다면 궁금하다.

사람이 올바른 해결책을 알고 있나요? 감사합니다!

해결법

  1. ==============================

    1.그냥 태닝 약간의 사용 :

    그냥 태닝 약간의 사용 :

    def convertDateFunc(resolution: DateResolutionType) = udf((x:String) => 
      SparkDateTimeConverter.convertDate(x, resolution))
    

    다음과 같이 그것을 사용 :

    case FieldDataType.Date => convertDateFunc(resolution(i))(allCols(i))
    

    측면에 당신이 sql.functions.trunc 및 sql.functions.date_format 살펴 보셔야 있습니다. 이러한 모든 UDF를 사용하지 않고 작업의 적어도 일부해야한다.

    노트 :

    스파크 2.2 이상 당신은 typedLit 기능을 사용할 수 있습니다 :

    import org.apache.spark.sql.functions.typedLit
    

    어떤 서열 또는지도 같은 리터럴의 넓은 범위를 지원합니다.

  2. ==============================

    2.당신은 org.apache.spark.sql.functions에 정의 된 점등 (...) 함수를 사용하여 UDF로 전달하는 문자 열을 생성 할 수 있습니다

    당신은 org.apache.spark.sql.functions에 정의 된 점등 (...) 함수를 사용하여 UDF로 전달하는 문자 열을 생성 할 수 있습니다

    예를 들면 :

    val takeRight = udf((s: String, i: Int) => s.takeRight(i))
    df.select(takeRight($"stringCol", lit(1)))
    
  3. from https://stackoverflow.com/questions/35546576/how-can-i-pass-extra-parameters-to-udfs-in-spark-sql by cc-by-sa and MIT license