[SCALA] 어떻게 정의하고 스파크 SQL에서 사용자 정의 집계 함수를 사용하는 방법?
SCALA어떻게 정의하고 스파크 SQL에서 사용자 정의 집계 함수를 사용하는 방법?
나는 스파크 SQL에서 UDF를 작성하는 방법을 알고 :
def belowThreshold(power: Int): Boolean = {
return power < -40
}
sqlContext.udf.register("belowThreshold", belowThreshold _)
나는 집계 함수를 정의하는 유사한 일을 할 수 있습니까? 어떻게 이런 일을합니까?
컨텍스트를 들어, 나는 다음과 같은 SQL 쿼리를 실행하려면 :
val aggDF = sqlContext.sql("""SELECT span, belowThreshold(opticalReceivePower), timestamp
FROM ifDF
WHERE opticalReceivePower IS NOT null
GROUP BY span, timestamp
ORDER BY span""")
그것은 뭔가를 반환해야합니다
행 (span1, 거짓, T0)
나는 임계 값 이하 범위와 타임 스탬프에 의해 정의 된 그룹의 opticalReceivePower에 대해 어떤 값이 있는지 집계 함수 나에게 말하고 싶어. 내가 위에 붙여 UDF 다르게 내 UDAF를 작성해야합니까?
해결법
-
==============================
1.스파크> = 2.3
스파크> = 2.3
벡터화 UDF (파이썬 전용) :
from pyspark.sql.functions import pandas_udf from pyspark.sql.functions import PandasUDFType from pyspark.sql.types import * import pandas as pd df = sc.parallelize([ ("a", 0), ("a", 1), ("b", 30), ("b", -50) ]).toDF(["group", "power"]) def below_threshold(threshold, group="group", power="power"): @pandas_udf("struct<group: string, below_threshold: boolean>", PandasUDFType.GROUPED_MAP) def below_threshold_(df): df = pd.DataFrame( df.groupby(group).apply(lambda x: (x[power] < threshold).any())) df.reset_index(inplace=True, drop=False) return df return below_threshold_
사용 예제 :
df.groupBy("group").apply(below_threshold(-40)).show() ## +-----+---------------+ ## |group|below_threshold| ## +-----+---------------+ ## | b| true| ## | a| false| ## +-----+---------------+
또한 (기능 파이썬 예제) PySpark에 GroupedData에 UDF를 적용 참조
스파크> = 2.0 (선택적 1.6하지만 약간 다른 API 포함) :
입력 된 데이터 집합에 게이터를 사용할 수있다 :
import org.apache.spark.sql.expressions.Aggregator import org.apache.spark.sql.{Encoder, Encoders} class BelowThreshold[I](f: I => Boolean) extends Aggregator[I, Boolean, Boolean] with Serializable { def zero = false def reduce(acc: Boolean, x: I) = acc | f(x) def merge(acc1: Boolean, acc2: Boolean) = acc1 | acc2 def finish(acc: Boolean) = acc def bufferEncoder: Encoder[Boolean] = Encoders.scalaBoolean def outputEncoder: Encoder[Boolean] = Encoders.scalaBoolean } val belowThreshold = new BelowThreshold[(String, Int)](_._2 < - 40).toColumn df.as[(String, Int)].groupByKey(_._1).agg(belowThreshold)
스파크> = 1.5 :
그것은 가장 가능성이 잔인한하지만 스파크 1.5에서는이 같은 UDAF를 만들 수 있습니다 :
import org.apache.spark.sql.expressions._ import org.apache.spark.sql.types._ import org.apache.spark.sql.Row object belowThreshold extends UserDefinedAggregateFunction { // Schema you get as an input def inputSchema = new StructType().add("power", IntegerType) // Schema of the row which is used for aggregation def bufferSchema = new StructType().add("ind", BooleanType) // Returned type def dataType = BooleanType // Self-explaining def deterministic = true // zero value def initialize(buffer: MutableAggregationBuffer) = buffer.update(0, false) // Similar to seqOp in aggregate def update(buffer: MutableAggregationBuffer, input: Row) = { if (!input.isNullAt(0)) buffer.update(0, buffer.getBoolean(0) | input.getInt(0) < -40) } // Similar to combOp in aggregate def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = { buffer1.update(0, buffer1.getBoolean(0) | buffer2.getBoolean(0)) } // Called on exit to get return value def evaluate(buffer: Row) = buffer.getBoolean(0) }
사용 예제 :
df .groupBy($"group") .agg(belowThreshold($"power").alias("belowThreshold")) .show // +-----+--------------+ // |group|belowThreshold| // +-----+--------------+ // | a| false| // | b| true| // +-----+--------------+
1.4 해결 방법을 스파크 :
내가 제대로하지만 지금까지의 내가 여기 충분해야 평범한 구식 집계를 말할 수있는 당신의 요구 사항을 이해하면 나는 확실하지 않다 :
val df = sc.parallelize(Seq( ("a", 0), ("a", 1), ("b", 30), ("b", -50))).toDF("group", "power") df .withColumn("belowThreshold", ($"power".lt(-40)).cast(IntegerType)) .groupBy($"group") .agg(sum($"belowThreshold").notEqual(0).alias("belowThreshold")) .show // +-----+--------------+ // |group|belowThreshold| // +-----+--------------+ // | a| false| // | b| true| // +-----+--------------+
스파크 <= 1.4 :
지금까지 나는이 순간 (스파크 1.4.1)에서 하이브가 아닌 다른 UDAF에 대한 지원이 없습니다, 알고있다. (SPARK-3947 참조) 스파크 1.5 수 있어야한다.
내부적으로 스파크는 ImperativeAggregates 및 DeclarativeAggregates를 포함한 클래스의 번호를 사용합니다.
이 내부 사용을위한 것입니다 및 추가 통보없이 변경 될 수 있습니다, 그래서 아마 DeclarativeAggregate과 완전성 BelowThreshold이 (스파크 2.2-SNAPSHOT 테스트) 다음과 같이 구현 될 수에 대한되지 프로덕션 코드에 사용할 것을, 그러나입니다 :
import org.apache.spark.sql.catalyst.expressions.aggregate.DeclarativeAggregate import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.types._ case class BelowThreshold(child: Expression, threshold: Expression) extends DeclarativeAggregate { override def children: Seq[Expression] = Seq(child, threshold) override def nullable: Boolean = false override def dataType: DataType = BooleanType private lazy val belowThreshold = AttributeReference( "belowThreshold", BooleanType, nullable = false )() // Used to derive schema override lazy val aggBufferAttributes = belowThreshold :: Nil override lazy val initialValues = Seq( Literal(false) ) override lazy val updateExpressions = Seq(Or( belowThreshold, If(IsNull(child), Literal(false), LessThan(child, threshold)) )) override lazy val mergeExpressions = Seq( Or(belowThreshold.left, belowThreshold.right) ) override lazy val evaluateExpression = belowThreshold override def defaultResult: Option[Literal] = Option(Literal(false)) }
그것은 더 withAggregateFunction의 등가 래핑한다.
from https://stackoverflow.com/questions/32100973/how-to-define-and-use-a-user-defined-aggregate-function-in-spark-sql by cc-by-sa and MIT license
'SCALA' 카테고리의 다른 글
[SCALA] 스칼라의 밀봉 특성의 반복? (0) | 2019.10.30 |
---|---|
[SCALA] 어떻게 스칼라에서 implicits 체인 수 있습니까? (0) | 2019.10.30 |
[SCALA] 어떻게하게 IntelliJ IDEA와 함께 SBT를 사용하여 동네 짱의 JAR (지방 JAR)을 구축? (0) | 2019.10.30 |
[SCALA] 은`#`연산자는 스칼라에서 무엇을 의미합니까? (0) | 2019.10.30 |
[SCALA] 스칼라의 VAR와 val 정의의 차이점은 무엇입니까? (0) | 2019.10.29 |