[SCALA] bufferSchema 성능 문제로 ArrayType와 UDAF 불꽃
SCALAbufferSchema 성능 문제로 ArrayType와 UDAF 불꽃
나는 요소의 배열을 반환하는 UDAF에서 일하고 있어요.
각각의 업데이트의 입력 인덱스 값 터플이다.
무엇 UDAF하는 일은 동일한 인덱스에서 모든 값을 요약하는 것입니다.
예:
(2,1), (3,1), (2,3), 입력 (지표 값)에 대
반환해야합니다 (0,0,4,1, ..., 0)
내 구현은 각 행에 대해 하나의 셀을 업데이트하지만, 논리가 잘 작동하지만 업데이트 방법에 문제가 그 방법을 실제로 복사 전체 배열의 마지막 과제 - 중복 매우 많은 시간이 소요됩니다.
혼자이 할당은 내 쿼리 실행 시간의 98 %를 담당합니다.
내 문제는 내가 그 시간을 줄일 수있는 방법이다? 그것은 전체 버퍼를 교체 할 필요없이 버퍼 배열에서 1의 값을 할당 할 수 있습니까?
P.S : 나는 스파크 1.6 함께 일하고 있어요, 나는 그것이 곧, 그래서이 버전에서 작동 할 솔루션에 충실하시기 바랍니다 업그레이드 할 수 없습니다.
class SumArrayAtIndexUDAF() extends UserDefinedAggregateFunction{
val bucketSize = 1000
def inputSchema: StructType = StructType(StructField("index",LongType) :: StructField("value",LongType) :: Nil)
def dataType: DataType = ArrayType(LongType)
def deterministic: Boolean = true
def bufferSchema: StructType = {
StructType(
StructField("buckets", ArrayType(LongType)) :: Nil
)
}
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = new Array[Long](bucketSize)
}
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
val index = input.getLong(0)
val value = input.getLong(1)
val arr = buffer.getAs[mutable.WrappedArray[Long]](0)
buffer(0) = arr // TODO THIS TAKES WAYYYYY TOO LONG - it actually copies the entire array for every call to this method (which essentially updates only 1 cell)
}
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
val arr1 = buffer1.getAs[mutable.WrappedArray[Long]](0)
val arr2 = buffer2.getAs[mutable.WrappedArray[Long]](0)
for(i <- arr1.indices){
arr1.update(i, arr1(i) + arr2(i))
}
buffer1(0) = arr1
}
override def evaluate(buffer: Row): Any = {
buffer.getAs[mutable.WrappedArray[Long]](0)
}
}
해결법
-
==============================
1.TL; DR UDAF 중 어느 하나를 사용하거나 ArrayType 대신 기본 형식을 사용하지 않는다.
TL; DR UDAF 중 어느 하나를 사용하거나 ArrayType 대신 기본 형식을 사용하지 않는다.
UserDefinedFunction없이
두 솔루션은 내부 및 외부 표현 사이에 고가의 저글링을 생략한다.
표준 단위 및 피벗을 사용하여
이 표준 SQL 집계를 사용합니다. 내부적으로 최적화하는 동안 배열의 키의 수와 크기가 증가 할 때 비싼 수 있습니다.
주어진 입력 :
val df = Seq((1, 2, 1), (1, 3, 1), (1, 2, 3)).toDF("id", "index", "value")
당신은 할 수 있습니다 :
import org.apache.spark.sql.functions.{array, coalesce, col, lit} val nBuckets = 10 @transient val values = array( 0 until nBuckets map (c => coalesce(col(c.toString), lit(0))): _* ) df .groupBy("id") .pivot("index", 0 until nBuckets) .sum("value") .select($"id", values.alias("values"))
+---+--------------------+ | id| values| +---+--------------------+ | 1|[0, 0, 4, 1, 0, 0...| +---+--------------------+
combineByKey / aggregateByKey와 RDD API를 사용.
변경 가능한 버퍼와 일반 오래된 byKey 집계. 어떤 종과 호각하지만 입력의 광범위한 합리적으로 잘 수행 없습니다. 당신이 스파 스로 입력이 의심되는 경우, 당신은 변경 가능한지도 등보다 효율적인 중간 표현을 고려할 수 있습니다.
rdd .aggregateByKey(Array.fill(nBuckets)(0L))( { case (acc, (index, value)) => { acc(index) += value; acc }}, (acc1, acc2) => { for (i <- 0 until nBuckets) acc1(i) += acc2(i); acc1} ).toDF
+---+--------------------+ | _1| _2| +---+--------------------+ | 1|[0, 0, 4, 1, 0, 0...| +---+--------------------+
기본 유형과 UserDefinedFunction 사용
지금까지 내가 내부를 이해, 성능 병목 ArrayConverter.toCatalystImpl이다.
그것은 각 호출 MutableAggregationBuffer.update에 대해 호출처럼, 차례로 각각의 행에 대한 새로운 GenericArrayData를 할당합니다.
우리는 bufferSchema을로 재정의하는 경우 :
def bufferSchema: StructType = { StructType( 0 to nBuckets map (i => StructField(s"x$i", LongType)) ) }
업데이트 및 병합은 두 버퍼 프리미티브 값 일반 대체로서 표현 될 수있다. 콜 체인은 꽤 오래 남아있을 것이다, 그러나 그것은 복사 / 변환 미친 할당을 필요로하지 않습니다. 널 (null) 검사를 생략하면 다음과 유사한 뭔가를해야합니다
val index = input.getLong(0) buffer.update(index, buffer.getLong(index) + input.getLong(1))
과
for(i <- 0 to nBuckets){ buffer1.update(i, buffer1.getLong(i) + buffer2.getLong(i)) }
각기.
마지막으로 서열을 행을 출력으로 변환해야 평가 :
for (i <- 0 to nBuckets) yield buffer.getLong(i)
이 구현 가능한 병목 병합 있음을 유의하시기 바랍니다. 이 M 버킷과 함께, 새로운 성능 문제를 소개하지 말아야하지만, 병합하기 위해 각 호출은 O (M)입니다.
K 고유 키, 및 P 파티션으로 각 키가 각 파티션에 한 번 이상 발생하는 최악의 시나리오에서 M * K 번 호출됩니다. 이것은 효과적으로 O (M * N * K)에 병합 공모 성분을 증가시킨다.
일반적으로 당신이 그것에 대해 할 수있는 일은 거의 없다. 당신은 데이터 분산에 대한 특정 가정을 만들 그러나 경우에, 당신이 일을 조금 바로 가기, 먼저 셔플 수 있습니다 (데이터 스파 스를, 키 분배가 균일) :
df .repartition(n, $"key") .groupBy($"key") .agg(SumArrayAtIndexUDAF($"index", $"value"))
가정이 만족하는 경우 그것은해야한다 :
하나 또는 두 가정이 충족되지 그러나, 당신은 업데이트의 수는 동일하게 유지하면서 그 셔플의 크기가 증가 할 것으로 예상 할 수 있습니다. 셔플 - - 동시에 데이터 스큐 갱신보다 더 나쁜 일을 할 수있는 시나리오를 병합합니다.
"강하게"입력 데이터 세트와 어 그리 게이터를 사용 :
import org.apache.spark.sql.expressions.Aggregator import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.{Encoder, Encoders} class SumArrayAtIndex[I](f: I => (Int, Long))(bucketSize: Int) extends Aggregator[I, Array[Long], Seq[Long]] with Serializable { def zero = Array.fill(bucketSize)(0L) def reduce(acc: Array[Long], x: I) = { val (i, v) = f(x) acc(i) += v acc } def merge(acc1: Array[Long], acc2: Array[Long]) = { for { i <- 0 until bucketSize } acc1(i) += acc2(i) acc1 } def finish(acc: Array[Long]) = acc.toSeq def bufferEncoder: Encoder[Array[Long]] = Encoders.kryo[Array[Long]] def outputEncoder: Encoder[Seq[Long]] = ExpressionEncoder() }
다음과 같이 사용될 수있는
val ds = Seq((1, (1, 3L)), (1, (2, 5L)), (1, (0, 1L)), (1, (4, 6L))).toDS ds .groupByKey(_._1) .agg(new SumArrayAtIndex[(Int, (Int, Long))](_._2)(10).toColumn) .show(false)
+-----+-------------------------------+ |value|SumArrayAtIndex(scala.Tuple2) | +-----+-------------------------------+ |1 |[1, 3, 5, 0, 6, 0, 0, 0, 0, 0] | |2 |[0, 11, 0, 0, 0, 0, 0, 0, 0, 0]| +-----+-------------------------------+
노트 :
또한 SPARK-27296 참조 - 집계 함수 (UDAFs) 정의 사용자가 주요 효율성에 문제가
from https://stackoverflow.com/questions/47293454/spark-udaf-with-arraytype-as-bufferschema-performance-issues by cc-by-sa and MIT license
'SCALA' 카테고리의 다른 글
[SCALA] 의존하는 방법의 유형에 대한 몇 가지 매력적인 사용 사례는 무엇입니까? (0) | 2019.11.03 |
---|---|
[SCALA] 비동기 JDBC 호출 할 수 있습니까? (0) | 2019.11.03 |
[SCALA] 컴파일 자바 7을 사용하는 SBT 설정? (0) | 2019.11.03 |
[SCALA] 스파크에서 두 개 이상의 DataFrame을 압축하는 방법 (0) | 2019.11.03 |
[SCALA] 스파크 DataFrame가 열이있는 경우 어떻게 감지 할 (0) | 2019.11.03 |