[SCALA] 어떻게 벡터의 열을 합계를 사용자 정의 집계 함수를 정의?
SCALA어떻게 벡터의 열을 합계를 사용자 정의 집계 함수를 정의?
나는 두 개의 열을 입력 지능 입력 벡터의 VEC (org.apache.spark.mllib.linalg.Vector)의 ID의 DataFrame 있습니다.
DataFrame은 다음과 같다 :
ID,Vec
1,[0,0,5]
1,[4,0,1]
1,[1,2,1]
2,[7,5,0]
2,[3,3,4]
3,[0,8,1]
3,[0,0,1]
3,[7,7,7]
....
나는 GROUPBY ($ "ID")를 수행 한 후 벡터를 합산하여 각 그룹 내부의 행에 집계를 적용하고 싶습니다.
상기 예는 원하는 출력 될 것이다 :
ID,SumOfVectors
1,[5,2,7]
2,[10,8,4]
3,[7,15,9]
...
사용 가능한 집계 함수는, 예를 들어, 작동하지 않습니다 df.groupBy ($ "ID"). AGG (SUM ($ "VEC는")는 ClassCastException이 이어질 것이다.
어떻게 나를 벡터 또는 배열이나 다른 사용자 작업의 합을 수행 할 수 있습니다 사용자 정의 집계 함수를 구현하는 방법?
해결법
-
==============================
1.개인적으로 나는 UDAFs 귀찮게하지 않을 것입니다. 자세한 것보다 더 정확히 빨리있다 (bufferSchema 성능 문제로 ArrayType와 UDAF 스파크) 대신 단순히 사용하는 것이 reduceByKey / foldByKey :
개인적으로 나는 UDAFs 귀찮게하지 않을 것입니다. 자세한 것보다 더 정확히 빨리있다 (bufferSchema 성능 문제로 ArrayType와 UDAF 스파크) 대신 단순히 사용하는 것이 reduceByKey / foldByKey :
import org.apache.spark.sql.Row import breeze.linalg.{DenseVector => BDV} import org.apache.spark.ml.linalg.{Vector, Vectors} def dv(values: Double*): Vector = Vectors.dense(values.toArray) val df = spark.createDataFrame(Seq( (1, dv(0,0,5)), (1, dv(4,0,1)), (1, dv(1,2,1)), (2, dv(7,5,0)), (2, dv(3,3,4)), (3, dv(0,8,1)), (3, dv(0,0,1)), (3, dv(7,7,7))) ).toDF("id", "vec") val aggregated = df .rdd .map{ case Row(k: Int, v: Vector) => (k, BDV(v.toDense.values)) } .foldByKey(BDV.zeros[Double](3))(_ += _) .mapValues(v => Vectors.dense(v.toArray)) .toDF("id", "vec") aggregated.show // +---+--------------+ // | id| vec| // +---+--------------+ // | 1| [5.0,2.0,7.0]| // | 2|[10.0,8.0,4.0]| // | 3|[7.0,15.0,9.0]| // +---+--------------+
그리고 단지 비교를 위해 "간단한"UDAF. 필수 수입 :
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} import org.apache.spark.ml.linalg.{Vector, Vectors, SQLDataTypes} import org.apache.spark.sql.types.{StructType, ArrayType, DoubleType} import org.apache.spark.sql.Row import scala.collection.mutable.WrappedArray
클래스 정의 :
class VectorSum (n: Int) extends UserDefinedAggregateFunction { def inputSchema = new StructType().add("v", SQLDataTypes.VectorType) def bufferSchema = new StructType().add("buff", ArrayType(DoubleType)) def dataType = SQLDataTypes.VectorType def deterministic = true def initialize(buffer: MutableAggregationBuffer) = { buffer.update(0, Array.fill(n)(0.0)) } def update(buffer: MutableAggregationBuffer, input: Row) = { if (!input.isNullAt(0)) { val buff = buffer.getAs[WrappedArray[Double]](0) val v = input.getAs[Vector](0).toSparse for (i <- v.indices) { buff(i) += v(i) } buffer.update(0, buff) } } def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = { val buff1 = buffer1.getAs[WrappedArray[Double]](0) val buff2 = buffer2.getAs[WrappedArray[Double]](0) for ((x, i) <- buff2.zipWithIndex) { buff1(i) += x } buffer1.update(0, buff1) } def evaluate(buffer: Row) = Vectors.dense( buffer.getAs[Seq[Double]](0).toArray) }
그리고 예를 들어 사용 :
df.groupBy($"id").agg(new VectorSum(3)($"vec") alias "vec").show // +---+--------------+ // | id| vec| // +---+--------------+ // | 1| [5.0,2.0,7.0]| // | 2|[10.0,8.0,4.0]| // | 3|[7.0,15.0,9.0]| // +---+--------------+
또한 참조 : 어떻게 스파크 SQL에서 그룹화 벡터 컬럼의 평균 찾을 수?
-
==============================
2.나는 그것을 최적화 할 수있는 다음 (이후 스파크 2.0.2에 작품을) 제안하지만 사전에 알아야 할 한 가지 당신이 UDAF 인스턴스를 만들 때 벡터 크기가 매우 좋다
나는 그것을 최적화 할 수있는 다음 (이후 스파크 2.0.2에 작품을) 제안하지만 사전에 알아야 할 한 가지 당신이 UDAF 인스턴스를 만들 때 벡터 크기가 매우 좋다
import org.apache.spark.ml.linalg._ import org.apache.spark.mllib.linalg.WeightedSparseVector import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} import org.apache.spark.sql.types._ class VectorAggregate(val numFeatures: Int) extends UserDefinedAggregateFunction { private type B = Map[Int, Double] def inputSchema: StructType = StructType(StructField("vec", new VectorUDT()) :: Nil) def bufferSchema: StructType = StructType(StructField("agg", MapType(IntegerType, DoubleType)) :: Nil) def initialize(buffer: MutableAggregationBuffer): Unit = buffer.update(0, Map.empty[Int, Double]) def update(buffer: MutableAggregationBuffer, input: Row): Unit = { val zero = buffer.getAs[B](0) input match { case Row(DenseVector(values)) => buffer.update(0, values.zipWithIndex.foldLeft(zero){case (acc,(v,i)) => acc.updated(i, v + acc.getOrElse(i,0d))}) case Row(SparseVector(_, indices, values)) => buffer.update(0, values.zip(indices).foldLeft(zero){case (acc,(v,i)) => acc.updated(i, v + acc.getOrElse(i,0d))}) }} def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { val zero = buffer1.getAs[B](0) buffer1.update(0, buffer2.getAs[B](0).foldLeft(zero){case (acc,(i,v)) => acc.updated(i, v + acc.getOrElse(i,0d))})} def deterministic: Boolean = true def evaluate(buffer: Row): Any = { val Row(agg: B) = buffer val indices = agg.keys.toArray.sorted Vectors.sparse(numFeatures,indices,indices.map(agg)).compressed } def dataType: DataType = new VectorUDT() }
from https://stackoverflow.com/questions/33899977/how-to-define-a-custom-aggregation-function-to-sum-a-column-of-vectors by cc-by-sa and MIT license
'SCALA' 카테고리의 다른 글
[SCALA] 때문에 긴 RDD 리니지에 유래 (0) | 2019.11.02 |
---|---|
[SCALA] 규모 :지도 병합 (0) | 2019.11.02 |
[SCALA] 새로운 스칼라 반사 API와 동반자 객체의 인스턴스를 가져옵니다 (0) | 2019.11.02 |
[SCALA] 스칼라 메소드 호출에서 괄호에 대한 규칙은 무엇인가? (0) | 2019.11.02 |
[SCALA] 스칼라의 "접미사 작전" (0) | 2019.11.02 |