복붙노트

[SCALA] 어떻게 벡터의 열을 합계를 사용자 정의 집계 함수를 정의?

SCALA

어떻게 벡터의 열을 합계를 사용자 정의 집계 함수를 정의?

나는 두 개의 열을 입력 지능 입력 벡터의 VEC (org.apache.spark.mllib.linalg.Vector)의 ID의 DataFrame 있습니다.

DataFrame은 다음과 같다 :

ID,Vec
1,[0,0,5]
1,[4,0,1]
1,[1,2,1]
2,[7,5,0]
2,[3,3,4]
3,[0,8,1]
3,[0,0,1]
3,[7,7,7]
....

나는 GROUPBY ($ "ID")를 수행 한 후 벡터를 합산하여 각 그룹 내부의 행에 집계를 적용하고 싶습니다.

상기 예는 원하는 출력 될 것이다 :

ID,SumOfVectors
1,[5,2,7]
2,[10,8,4]
3,[7,15,9]
...

사용 가능한 집계 함수는, 예를 들어, 작동하지 않습니다 df.groupBy ($ "ID"). AGG (SUM ($ "VEC는")는 ClassCastException이 이어질 것이다.

어떻게 나를 벡터 또는 배열이나 다른 사용자 작업의 합을 수행 할 수 있습니다 사용자 정의 집계 함수를 구현하는 방법?

해결법

  1. ==============================

    1.개인적으로 나는 UDAFs 귀찮게하지 않을 것입니다. 자세한 것보다 더 정확히 빨리있다 (bufferSchema 성능 문제로 ArrayType와 UDAF 스파크) 대신 단순히 사용하는 것이 reduceByKey / foldByKey :

    개인적으로 나는 UDAFs 귀찮게하지 않을 것입니다. 자세한 것보다 더 정확히 빨리있다 (bufferSchema 성능 문제로 ArrayType와 UDAF 스파크) 대신 단순히 사용하는 것이 reduceByKey / foldByKey :

    import org.apache.spark.sql.Row
    import breeze.linalg.{DenseVector => BDV}
    import org.apache.spark.ml.linalg.{Vector, Vectors}
    
    def dv(values: Double*): Vector = Vectors.dense(values.toArray)
    
    val df = spark.createDataFrame(Seq(
        (1, dv(0,0,5)), (1, dv(4,0,1)), (1, dv(1,2,1)),
        (2, dv(7,5,0)), (2, dv(3,3,4)), 
        (3, dv(0,8,1)), (3, dv(0,0,1)), (3, dv(7,7,7)))
      ).toDF("id", "vec")
    
    val aggregated = df
      .rdd
      .map{ case Row(k: Int, v: Vector) => (k, BDV(v.toDense.values)) }
      .foldByKey(BDV.zeros[Double](3))(_ += _)
      .mapValues(v => Vectors.dense(v.toArray))
      .toDF("id", "vec")
    
    aggregated.show
    
    // +---+--------------+
    // | id|           vec|
    // +---+--------------+
    // |  1| [5.0,2.0,7.0]|
    // |  2|[10.0,8.0,4.0]|
    // |  3|[7.0,15.0,9.0]|
    // +---+--------------+
    

    그리고 단지 비교를 위해 "간단한"UDAF. 필수 수입 :

    import org.apache.spark.sql.expressions.{MutableAggregationBuffer,
      UserDefinedAggregateFunction}
    import org.apache.spark.ml.linalg.{Vector, Vectors, SQLDataTypes}
    import org.apache.spark.sql.types.{StructType, ArrayType, DoubleType}
    import org.apache.spark.sql.Row
    import scala.collection.mutable.WrappedArray
    

    클래스 정의 :

    class VectorSum (n: Int) extends UserDefinedAggregateFunction {
        def inputSchema = new StructType().add("v", SQLDataTypes.VectorType)
        def bufferSchema = new StructType().add("buff", ArrayType(DoubleType))
        def dataType = SQLDataTypes.VectorType
        def deterministic = true 
    
        def initialize(buffer: MutableAggregationBuffer) = {
          buffer.update(0, Array.fill(n)(0.0))
        }
    
        def update(buffer: MutableAggregationBuffer, input: Row) = {
          if (!input.isNullAt(0)) {
            val buff = buffer.getAs[WrappedArray[Double]](0) 
            val v = input.getAs[Vector](0).toSparse
            for (i <- v.indices) {
              buff(i) += v(i)
            }
            buffer.update(0, buff)
          }
        }
    
        def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
          val buff1 = buffer1.getAs[WrappedArray[Double]](0) 
          val buff2 = buffer2.getAs[WrappedArray[Double]](0) 
          for ((x, i) <- buff2.zipWithIndex) {
            buff1(i) += x
          }
          buffer1.update(0, buff1)
        }
    
        def evaluate(buffer: Row) =  Vectors.dense(
          buffer.getAs[Seq[Double]](0).toArray)
    } 
    

    그리고 예를 들어 사용 :

    df.groupBy($"id").agg(new VectorSum(3)($"vec") alias "vec").show
    
    // +---+--------------+
    // | id|           vec|
    // +---+--------------+
    // |  1| [5.0,2.0,7.0]|
    // |  2|[10.0,8.0,4.0]|
    // |  3|[7.0,15.0,9.0]|
    // +---+--------------+
    

    또한 참조 : 어떻게 스파크 SQL에서 그룹화 벡터 컬럼의 평균 찾을 수?

  2. ==============================

    2.나는 그것을 최적화 할 수있는 다음 (이후 스파크 2.0.2에 작품을) 제안하지만 사전에 알아야 할 한 가지 당신이 UDAF 인스턴스를 만들 때 벡터 크기가 매우 좋다

    나는 그것을 최적화 할 수있는 다음 (이후 스파크 2.0.2에 작품을) 제안하지만 사전에 알아야 할 한 가지 당신이 UDAF 인스턴스를 만들 때 벡터 크기가 매우 좋다

    import org.apache.spark.ml.linalg._
    import org.apache.spark.mllib.linalg.WeightedSparseVector
    import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
    import org.apache.spark.sql.types._
    
    class VectorAggregate(val numFeatures: Int)
       extends UserDefinedAggregateFunction {
    
    private type B = Map[Int, Double]
    
    def inputSchema: StructType = StructType(StructField("vec", new VectorUDT()) :: Nil)
    
    def bufferSchema: StructType =
    StructType(StructField("agg", MapType(IntegerType, DoubleType)) :: Nil)
    
    def initialize(buffer: MutableAggregationBuffer): Unit =
    buffer.update(0, Map.empty[Int, Double])
    
    def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
        val zero = buffer.getAs[B](0)
        input match {
            case Row(DenseVector(values)) => buffer.update(0, values.zipWithIndex.foldLeft(zero){case (acc,(v,i)) => acc.updated(i, v + acc.getOrElse(i,0d))})
            case Row(SparseVector(_, indices, values)) => buffer.update(0, values.zip(indices).foldLeft(zero){case (acc,(v,i)) => acc.updated(i, v + acc.getOrElse(i,0d))}) }}
    def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    val zero = buffer1.getAs[B](0)
    buffer1.update(0, buffer2.getAs[B](0).foldLeft(zero){case (acc,(i,v)) => acc.updated(i, v + acc.getOrElse(i,0d))})}
    
    def deterministic: Boolean = true
    
    def evaluate(buffer: Row): Any = {
        val Row(agg: B) = buffer
        val indices = agg.keys.toArray.sorted
        Vectors.sparse(numFeatures,indices,indices.map(agg)).compressed
    }
    
    def dataType: DataType = new VectorUDT()
    }
    
  3. from https://stackoverflow.com/questions/33899977/how-to-define-a-custom-aggregation-function-to-sum-a-column-of-vectors by cc-by-sa and MIT license