복붙노트

[SCALA] bufferSchema 성능 문제로 ArrayType와 UDAF 불꽃

SCALA

bufferSchema 성능 문제로 ArrayType와 UDAF 불꽃

나는 요소의 배열을 반환하는 UDAF에서 일하고 있어요.

각각의 업데이트의 입력 인덱스 값 터플이다.

무엇 UDAF하는 일은 동일한 인덱스에서 모든 값을 요약하는 것입니다.

예:

(2,1), (3,1), (2,3), 입력 (지표 값)에 대

반환해야합니다 (0,0,4,1, ..., 0)

내 구현은 각 행에 대해 하나의 셀을 업데이트하지만, 논리가 잘 작동하지만 업데이트 방법에 문제가 그 방법을 실제로 복사 전체 배열의 마지막 과제 - 중복 매우 많은 시간이 소요됩니다.

혼자이 할당은 내 쿼리 실행 시간의 98 %를 담당합니다.

내 문제는 내가 그 시간을 줄일 수있는 방법이다? 그것은 전체 버퍼를 교체 할 필요없이 버퍼 배열에서 1의 값을 할당 할 수 있습니까?

P.S : 나는 스파크 1.6 함께 일하고 있어요, 나는 그것이 곧, 그래서이 버전에서 작동 할 솔루션에 충실하시기 바랍니다 업그레이드 할 수 없습니다.

class SumArrayAtIndexUDAF() extends UserDefinedAggregateFunction{

  val bucketSize = 1000

  def inputSchema: StructType =  StructType(StructField("index",LongType) :: StructField("value",LongType) :: Nil)

  def dataType: DataType = ArrayType(LongType)

  def deterministic: Boolean = true

  def bufferSchema: StructType = {
    StructType(
      StructField("buckets", ArrayType(LongType)) :: Nil  
    )
  }

  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = new Array[Long](bucketSize)
  }

  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    val index = input.getLong(0)
    val value = input.getLong(1)

    val arr = buffer.getAs[mutable.WrappedArray[Long]](0)

    buffer(0) = arr   // TODO THIS TAKES WAYYYYY TOO LONG - it actually copies the entire array for every call to this method (which essentially updates only 1 cell)
  }

    override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    val arr1 = buffer1.getAs[mutable.WrappedArray[Long]](0)
    val arr2 = buffer2.getAs[mutable.WrappedArray[Long]](0)

    for(i <- arr1.indices){
      arr1.update(i, arr1(i) + arr2(i))
    }

    buffer1(0) = arr1
  }

  override def evaluate(buffer: Row): Any = {
    buffer.getAs[mutable.WrappedArray[Long]](0)
  }
}

해결법

  1. ==============================

    1.TL; DR UDAF 중 어느 하나를 사용하거나 ArrayType 대신 기본 형식을 사용하지 않는다.

    TL; DR UDAF 중 어느 하나를 사용하거나 ArrayType 대신 기본 형식을 사용하지 않는다.

    UserDefinedFunction없이

    두 솔루션은 내부 및 외부 표현 사이에 고가의 저글링을 생략한다.

    표준 단위 및 피벗을 사용하여

    이 표준 SQL 집계를 사용합니다. 내부적으로 최적화하는 동안 배열의 키의 수와 크기가 증가 할 때 비싼 수 있습니다.

    주어진 입력 :

    val df = Seq((1, 2, 1), (1, 3, 1), (1, 2, 3)).toDF("id", "index", "value")
    

    당신은 할 수 있습니다 :

    import org.apache.spark.sql.functions.{array, coalesce, col, lit}
    
    val nBuckets = 10
    @transient val values = array(
      0 until nBuckets map (c => coalesce(col(c.toString), lit(0))): _*
    )
    
    df
      .groupBy("id")
      .pivot("index", 0 until nBuckets)
      .sum("value")
      .select($"id", values.alias("values"))
    
    +---+--------------------+                                                      
    | id|              values|
    +---+--------------------+
    |  1|[0, 0, 4, 1, 0, 0...|
    +---+--------------------+
    

    combineByKey / aggregateByKey와 RDD API를 사용.

    변경 가능한 버퍼와 일반 오래된 byKey 집계. 어떤 종과 호각하지만 입력의 광범위한 합리적으로 잘 수행 없습니다. 당신이 스파 스로 입력이 의심되는 경우, 당신은 변경 가능한지도 등보다 효율적인 중간 표현을 고려할 수 있습니다.

    rdd
      .aggregateByKey(Array.fill(nBuckets)(0L))(
        { case (acc, (index, value)) => { acc(index) += value; acc }},
        (acc1, acc2) => { for (i <- 0 until nBuckets) acc1(i) += acc2(i); acc1}
      ).toDF
    
    +---+--------------------+
    | _1|                  _2|
    +---+--------------------+
    |  1|[0, 0, 4, 1, 0, 0...|
    +---+--------------------+
    

    기본 유형과 UserDefinedFunction 사용

    지금까지 내가 내부를 이해, 성능 병목 ArrayConverter.toCatalystImpl이다.

    그것은 각 호출 MutableAggregationBuffer.update에 대해 호출처럼, 차례로 각각의 행에 대한 새로운 GenericArrayData를 할당합니다.

    우리는 bufferSchema을로 재정의하는 경우 :

    def bufferSchema: StructType = {
      StructType(
        0 to nBuckets map (i => StructField(s"x$i", LongType))
      )
    }
    

    업데이트 및 병합은 두 버퍼 프리미티브 값 일반 대체로서 표현 될 수있다. 콜 체인은 꽤 오래 남아있을 것이다, 그러나 그것은 복사 / 변환 미친 할당을 필요로하지 않습니다. 널 (null) 검사를 생략하면 다음과 유사한 뭔가를해야합니다

    val index = input.getLong(0)
    buffer.update(index, buffer.getLong(index) + input.getLong(1))
    

    for(i <- 0 to nBuckets){
      buffer1.update(i, buffer1.getLong(i) + buffer2.getLong(i))
    }
    

    각기.

    마지막으로 서열을 행을 출력으로 변환해야 평가 :

     for (i <- 0 to nBuckets)  yield buffer.getLong(i)
    

    이 구현 가능한 병목 병합 있음을 유의하시기 바랍니다. 이 M 버킷과 함께, 새로운 성능 문제를 소개하지 말아야하지만, 병합하기 위해 각 호출은 O (M)입니다.

    K 고유 ​​키, 및 P 파티션으로 각 키가 각 파티션에 한 번 이상 발생하는 최악의 시나리오에서 M * K 번 호출됩니다. 이것은 효과적으로 O (M * N * K)에 병합 공모 성분을 증가시킨다.

    일반적으로 당신이 그것에 대해 할 수있는 일은 거의 없다. 당신은 데이터 분산에 대한 특정 가정을 만들 그러나 경우에, 당신이 일을 조금 바로 가기, 먼저 셔플 수 있습니다 (데이터 스파 스를, 키 분배가 균일) :

    df
      .repartition(n, $"key")
      .groupBy($"key")
      .agg(SumArrayAtIndexUDAF($"index", $"value"))
    

    가정이 만족하는 경우 그것은해야한다 :

    하나 또는 두 가정이 충족되지 그러나, 당신은 업데이트의 수는 동일하게 유지하면서 그 셔플의 크기가 증가 할 것으로 예상 할 수 있습니다. 셔플 - - 동시에 데이터 스큐 갱신보다 더 나쁜 일을 할 수있는 시나리오를 병합합니다.

    "강하게"입력 데이터 세트와 어 그리 게이터를 사용 :

    import org.apache.spark.sql.expressions.Aggregator
    import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
    import org.apache.spark.sql.{Encoder, Encoders}
    
    class SumArrayAtIndex[I](f: I => (Int, Long))(bucketSize: Int)  extends Aggregator[I, Array[Long], Seq[Long]]
        with Serializable {
      def zero = Array.fill(bucketSize)(0L)
      def reduce(acc: Array[Long], x: I) = {
        val (i, v) = f(x)
        acc(i) += v
        acc
      }
    
      def merge(acc1: Array[Long], acc2: Array[Long]) = {
        for {
          i <- 0 until bucketSize
        } acc1(i) += acc2(i)
        acc1
      }
    
      def finish(acc: Array[Long]) = acc.toSeq
    
      def bufferEncoder: Encoder[Array[Long]] = Encoders.kryo[Array[Long]]
      def outputEncoder: Encoder[Seq[Long]] = ExpressionEncoder()
    }
    

    다음과 같이 사용될 수있는

    val ds = Seq((1, (1, 3L)), (1, (2, 5L)), (1, (0, 1L)), (1, (4, 6L))).toDS
    
    ds
      .groupByKey(_._1)
      .agg(new SumArrayAtIndex[(Int, (Int, Long))](_._2)(10).toColumn)
      .show(false)
    
    +-----+-------------------------------+
    |value|SumArrayAtIndex(scala.Tuple2)  |
    +-----+-------------------------------+
    |1    |[1, 3, 5, 0, 6, 0, 0, 0, 0, 0] |
    |2    |[0, 11, 0, 0, 0, 0, 0, 0, 0, 0]|
    +-----+-------------------------------+
    

    노트 :

    또한 SPARK-27296 참조 - 집계 함수 (UDAFs) 정의 사용자가 주요 효율성에 문제가

  2. from https://stackoverflow.com/questions/47293454/spark-udaf-with-arraytype-as-bufferschema-performance-issues by cc-by-sa and MIT license