복붙노트

[SCALA] 어떻게 정의하고 스파크 SQL에서 사용자 정의 집계 함수를 사용하는 방법?

SCALA

어떻게 정의하고 스파크 SQL에서 사용자 정의 집계 함수를 사용하는 방법?

나는 스파크 SQL에서 UDF를 작성하는 방법을 알고 :

def belowThreshold(power: Int): Boolean = {
        return power < -40
      }

sqlContext.udf.register("belowThreshold", belowThreshold _)

나는 집계 함수를 정의하는 유사한 일을 할 수 있습니까? 어떻게 이런 일을합니까?

컨텍스트를 들어, 나는 다음과 같은 SQL 쿼리를 실행하려면 :

val aggDF = sqlContext.sql("""SELECT span, belowThreshold(opticalReceivePower), timestamp
                                    FROM ifDF
                                    WHERE opticalReceivePower IS NOT null
                                    GROUP BY span, timestamp
                                    ORDER BY span""")

그것은 뭔가를 반환해야합니다

행 (span1, 거짓, T0)

나는 임계 값 이하 범위와 타임 스탬프에 의해 정의 된 그룹의 opticalReceivePower에 대해 어떤 값이 있는지 집계 함수 나에게 말하고 싶어. 내가 위에 붙여 UDF 다르게 내 UDAF를 작성해야합니까?

해결법

  1. ==============================

    1.스파크> = 2.3

    스파크> = 2.3

    벡터화 UDF (파이썬 전용) :

    from pyspark.sql.functions import pandas_udf
    from pyspark.sql.functions import PandasUDFType
    
    from pyspark.sql.types import *
    import pandas as pd
    
    df = sc.parallelize([
        ("a", 0), ("a", 1), ("b", 30), ("b", -50)
    ]).toDF(["group", "power"])
    
    def below_threshold(threshold, group="group", power="power"):
        @pandas_udf("struct<group: string, below_threshold: boolean>", PandasUDFType.GROUPED_MAP)
        def below_threshold_(df):
            df = pd.DataFrame(
               df.groupby(group).apply(lambda x: (x[power] < threshold).any()))
            df.reset_index(inplace=True, drop=False)
            return df
    
        return below_threshold_
    

    사용 예제 :

    df.groupBy("group").apply(below_threshold(-40)).show()
    
    ## +-----+---------------+
    ## |group|below_threshold|
    ## +-----+---------------+
    ## |    b|           true|
    ## |    a|          false|
    ## +-----+---------------+
    

    또한 (기능 파이썬 예제) PySpark에 GroupedData에 UDF를 적용 참조

    스파크> = 2.0 (선택적 1.6하지만 약간 다른 API 포함) :

    입력 된 데이터 집합에 게이터를 사용할 수있다 :

    import org.apache.spark.sql.expressions.Aggregator
    import org.apache.spark.sql.{Encoder, Encoders}
    
    class BelowThreshold[I](f: I => Boolean)  extends Aggregator[I, Boolean, Boolean]
        with Serializable {
      def zero = false
      def reduce(acc: Boolean, x: I) = acc | f(x)
      def merge(acc1: Boolean, acc2: Boolean) = acc1 | acc2
      def finish(acc: Boolean) = acc
    
      def bufferEncoder: Encoder[Boolean] = Encoders.scalaBoolean
      def outputEncoder: Encoder[Boolean] = Encoders.scalaBoolean
    }
    
    val belowThreshold = new BelowThreshold[(String, Int)](_._2 < - 40).toColumn
    df.as[(String, Int)].groupByKey(_._1).agg(belowThreshold)
    

    스파크> = 1.5 :

    그것은 가장 가능성이 잔인한하지만 스파크 1.5에서는이 같은 UDAF를 만들 수 있습니다 :

    import org.apache.spark.sql.expressions._
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.Row
    
    object belowThreshold extends UserDefinedAggregateFunction {
        // Schema you get as an input
        def inputSchema = new StructType().add("power", IntegerType)
        // Schema of the row which is used for aggregation
        def bufferSchema = new StructType().add("ind", BooleanType)
        // Returned type
        def dataType = BooleanType
        // Self-explaining 
        def deterministic = true
        // zero value
        def initialize(buffer: MutableAggregationBuffer) = buffer.update(0, false)
        // Similar to seqOp in aggregate
        def update(buffer: MutableAggregationBuffer, input: Row) = {
            if (!input.isNullAt(0))
              buffer.update(0, buffer.getBoolean(0) | input.getInt(0) < -40)
        }
        // Similar to combOp in aggregate
        def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
          buffer1.update(0, buffer1.getBoolean(0) | buffer2.getBoolean(0))    
        }
        // Called on exit to get return value
        def evaluate(buffer: Row) = buffer.getBoolean(0)
    }
    

    사용 예제 :

    df
      .groupBy($"group")
      .agg(belowThreshold($"power").alias("belowThreshold"))
      .show
    
    // +-----+--------------+
    // |group|belowThreshold|
    // +-----+--------------+
    // |    a|         false|
    // |    b|          true|
    // +-----+--------------+
    

    1.4 해결 방법을 스파크 :

    내가 제대로하지만 지금까지의 내가 여기 충분해야 평범한 구식 집계를 말할 수있는 당신의 요구 사항을 이해하면 나는 확실하지 않다 :

    val df = sc.parallelize(Seq(
        ("a", 0), ("a", 1), ("b", 30), ("b", -50))).toDF("group", "power")
    
    df
      .withColumn("belowThreshold", ($"power".lt(-40)).cast(IntegerType))
      .groupBy($"group")
      .agg(sum($"belowThreshold").notEqual(0).alias("belowThreshold"))
      .show
    
    // +-----+--------------+
    // |group|belowThreshold|
    // +-----+--------------+
    // |    a|         false|
    // |    b|          true|
    // +-----+--------------+
    

    스파크 <= 1.4 :

    지금까지 나는이 순간 (스파크 1.4.1)에서 하이브가 아닌 다른 UDAF에 대한 지원이 없습니다, 알고있다. (SPARK-3947 참조) 스파크 1.5 수 있어야한다.

    내부적으로 스파크는 ImperativeAggregates 및 DeclarativeAggregates를 포함한 클래스의 번호를 사용합니다.

    이 내부 사용을위한 것입니다 및 추가 통보없이 변경 될 수 있습니다, 그래서 아마 DeclarativeAggregate과 완전성 BelowThreshold이 (스파크 2.2-SNAPSHOT 테스트) 다음과 같이 구현 될 수에 대한되지 프로덕션 코드에 사용할 것을, 그러나입니다 :

    import org.apache.spark.sql.catalyst.expressions.aggregate.DeclarativeAggregate
    import org.apache.spark.sql.catalyst.expressions._
    import org.apache.spark.sql.types._
    
    case class BelowThreshold(child: Expression, threshold: Expression) 
        extends  DeclarativeAggregate  {
      override def children: Seq[Expression] = Seq(child, threshold)
    
      override def nullable: Boolean = false
      override def dataType: DataType = BooleanType
    
      private lazy val belowThreshold = AttributeReference(
        "belowThreshold", BooleanType, nullable = false
      )()
    
      // Used to derive schema
      override lazy val aggBufferAttributes = belowThreshold :: Nil
    
      override lazy val initialValues = Seq(
        Literal(false)
      )
    
      override lazy val updateExpressions = Seq(Or(
        belowThreshold,
        If(IsNull(child), Literal(false), LessThan(child, threshold))
      ))
    
      override lazy val mergeExpressions = Seq(
        Or(belowThreshold.left, belowThreshold.right)
      )
    
      override lazy val evaluateExpression = belowThreshold
      override def defaultResult: Option[Literal] = Option(Literal(false))
    } 
    

    그것은 더 withAggregateFunction의 등가 래핑한다.

  2. from https://stackoverflow.com/questions/32100973/how-to-define-and-use-a-user-defined-aggregate-function-in-spark-sql by cc-by-sa and MIT license