복붙노트

[SCALA] 슬라이스 배열 방법 및 열의 합 요소?

SCALA

슬라이스 배열 방법 및 열의 합 요소?

I는 합계 싶은 (또는 다른 집합의 기능도 수행) SparkSQL를 사용하여 어레이 칼럼 것이다.

나는 테이블로가

+-------+-------+---------------------------------+
|dept_id|dept_nm|                      emp_details|
+-------+-------+---------------------------------+
|     10|Finance|        [100, 200, 300, 400, 500]|
|     20|     IT|                [10, 20, 50, 100]|
+-------+-------+---------------------------------+

나는이 emp_details 컬럼의 값을 요약하고 싶습니다.

예상 질의 :

sqlContext.sql("select sum(emp_details) from mytable").show

예상 결과

1500
180

또한 내가 좋아 너무 범위 요소를 요약 할 수 있어야한다 :

sqlContext.sql("select sum(slice(emp_details,0,3)) from mytable").show

결과

600
80

예상대로 어레이 형의 합을 수행 할 때 그 합이 인수는 수치 입력하지 어레이 형으로 기대 보여준다.

나는 우리가 이것에 대한 UDF를 만들 필요가 있다고 생각. 하지만 어떻게?

나는 UDF를 어떤 성능 안타를 직면 할 것인가? 그리고 UDF 하나에서 떨어져 다른 솔루션이 있습니까?

해결법

  1. ==============================

    1.스파크 2.4으로되어 스파크 SQL 지원 고차 함수 배열을 포함하여 복잡한 데이터 구조를 조작한다.

    스파크 2.4으로되어 스파크 SQL 지원 고차 함수 배열을 포함하여 복잡한 데이터 구조를 조작한다.

    다음과 같이 "현대"솔루션은 다음과 같습니다

    scala> input.show(false)
    +-------+-------+-------------------------+
    |dept_id|dept_nm|emp_details              |
    +-------+-------+-------------------------+
    |10     |Finance|[100, 200, 300, 400, 500]|
    |20     |IT     |[10, 20, 50, 100]        |
    +-------+-------+-------------------------+
    
    input.createOrReplaceTempView("mytable")
    
    val sqlText = "select dept_id, dept_nm, aggregate(emp_details, 0, (acc, value) -> acc + value) as sum from mytable"
    scala> sql(sqlText).show
    +-------+-------+----+
    |dept_id|dept_nm| sum|
    +-------+-------+----+
    |     10|Finance|1500|
    |     20|     IT| 180|
    +-------+-------+----+
    

    다음 기사 및 비디오 higher-order 함수에 읽기 좋은를 찾을 수 있습니다 :

    부인 때문에 나는 스파크 SQL이 Dataset.map을 실행하기 위해 수행하는 직렬화의 (그것이 가장 upvotes을 가지고에도 불구하고)이 방법을 권하고 싶지 않다. 질의 력 데이터를 역 직렬화하고 (JVM 외부 스파크에 의해 관리되는 메모리 영역으로부터) JVM에로드하는 스파크. 그것은 필연적으로 더 자주 GC를 초래할 따라서 성능 악화시킬 것입니다.

    한 가지 해결책은 스파크 SQL 및 스칼라의 조합이 그 힘을 보여줄 수있는 데이터 집합 솔루션을 사용하는 것입니다.

    scala> val inventory = Seq(
         |   (10, "Finance", Seq(100, 200, 300, 400, 500)),
         |   (20, "IT", Seq(10, 20, 50, 100))).toDF("dept_id", "dept_nm", "emp_details")
    inventory: org.apache.spark.sql.DataFrame = [dept_id: int, dept_nm: string ... 1 more field]
    
    // I'm too lazy today for a case class
    scala> inventory.as[(Long, String, Seq[Int])].
      map { case (deptId, deptName, details) => (deptId, deptName, details.sum) }.
      toDF("dept_id", "dept_nm", "sum").
      show
    +-------+-------+----+
    |dept_id|dept_nm| sum|
    +-------+-------+----+
    |     10|Finance|1500|
    |     20|     IT| 180|
    +-------+-------+----+
    

    이 똑같이 간단로서 나는 연습으로 슬라이스 부분을 떠날거야.

  2. ==============================

    2.가능한 방법은 배열 컬럼 () 폭발 사용하고, 결과적으로 고유 키에 의해 출력을 집계합니다. 예를 들면 :

    가능한 방법은 배열 컬럼 () 폭발 사용하고, 결과적으로 고유 키에 의해 출력을 집계합니다. 예를 들면 :

    import sqlContext.implicits._
    import org.apache.spark.sql.functions._
    
    (mytable
      .withColumn("emp_sum",
        explode($"emp_details"))
      .groupBy("dept_nm")
      .agg(sum("emp_sum")).show)
    +-------+------------+
    |dept_nm|sum(emp_sum)|
    +-------+------------+
    |Finance|        1500|
    |     IT|         180|
    +-------+------------+
    

    배열에서 특정 값을 선택하기 위해, 우리는 링크 된 질문의 대답 일을하고 약간의 수정으로 적용 할 수 있습니다 :

    val slice = udf((array : Seq[Int], from : Int, to : Int) => array.slice(from,to))
    
    (mytable
      .withColumn("slice", 
        slice($"emp_details", 
          lit(0), 
          lit(3)))
      .withColumn("emp_sum",
        explode($"slice"))
      .groupBy("dept_nm")
      .agg(sum("emp_sum")).show)
    +-------+------------+
    |dept_nm|sum(emp_sum)|
    +-------+------------+
    |Finance|         600|
    |     IT|          80|
    +-------+------------+
    

    데이터:

    val data = Seq((10, "Finance", Array(100,200,300,400,500)),
                   (20, "IT", Array(10,20,50,100)))
    val mytable = sc.parallelize(data).toDF("dept_id", "dept_nm","emp_details")
    
  3. ==============================

    3.스파크 2.4 이후에는 슬라이스 기능으로 슬라이스 수 있습니다 :

    스파크 2.4 이후에는 슬라이스 기능으로 슬라이스 수 있습니다 :

    import org.apache.spark.sql.functions.slice
    
    val df = Seq(
      (10, "Finance", Seq(100, 200, 300, 400, 500)),
      (20, "IT", Seq(10, 20, 50, 100))
    ).toDF("dept_id", "dept_nm", "emp_details")
    
    val dfSliced = df.withColumn(
       "emp_details_sliced",
       slice($"emp_details", 1, 3)
    )
    
    dfSliced.show(false)
    
    +-------+-------+-------------------------+------------------+
    |dept_id|dept_nm|emp_details              |emp_details_sliced|
    +-------+-------+-------------------------+------------------+
    |10     |Finance|[100, 200, 300, 400, 500]|[100, 200, 300]   |
    |20     |IT     |[10, 20, 50, 100]        |[10, 20, 50]      |
    +-------+-------+-------------------------+------------------+
    

    골재와 배열을 요약 :

    dfSliced.selectExpr(
      "*", 
      "aggregate(emp_details, 0, (x, y) -> x + y) as details_sum",  
      "aggregate(emp_details_sliced, 0, (x, y) -> x + y) as details_sliced_sum"
    ).show
    
    +-------+-------+--------------------+------------------+-----------+------------------+
    |dept_id|dept_nm|         emp_details|emp_details_sliced|details_sum|details_sliced_sum|
    +-------+-------+--------------------+------------------+-----------+------------------+
    |     10|Finance|[100, 200, 300, 4...|   [100, 200, 300]|       1500|               600|
    |     20|     IT|   [10, 20, 50, 100]|      [10, 20, 50]|        180|                80|
    +-------+-------+--------------------+------------------+-----------+------------------+
    
  4. ==============================

    4.여기 GROUPBY를 사용하지 않고 mtoto의 대답에 대한 대안입니다 (: UDF, mtoto 솔루션이나 광산, 코멘트 환영 정말 빠른 하나 인 모른다)

    여기 GROUPBY를 사용하지 않고 mtoto의 대답에 대한 대안입니다 (: UDF, mtoto 솔루션이나 광산, 코멘트 환영 정말 빠른 하나 인 모른다)

    당신은 것 일반적으로 UDF를 사용하여 성능에 영향. 이 읽고 할 수 있습니다 응답이며,이 자원은 UDF에 읽기 좋다.

    이제 문제에 대한, 당신은 UDF의 사용을 방지 할 수 있습니다. 내가 사용하는 것이 것은 스칼라 로직 생성 된 열 표현이다.

    데이터:

    val df = Seq((10, "Finance", Array(100,200,300,400,500)),
                      (20, "IT", Array(10,  20, 50,100)))
              .toDF("dept_id", "dept_nm","emp_details")
    

    당신은 ArrayType을 통과 할 수 있도록 약간의 속임수가 필요합니다, 당신은 여러 가지 문제를 발견 (슬라이스 부분 하단에있는 편집 참조) 솔루션으로 비트를 재생할 수 있습니다. 여기 내 제안하지만 당신은 더 나은 찾을 수 있습니다. 먼저 최대 길이를 가지고

    val maxLength = df.select(size('emp_details).as("l")).groupBy().max("l").first.getInt(0)
    

    당신이 짧은 배열이있을 때 그런 다음 테스트, 사용

    val sumArray = (1 until maxLength)
          .map(i => when(size('emp_details) > i,'emp_details(i)).otherwise(lit(0)))
          .reduce(_ + _)
          .as("sumArray")
    
    val res = df
      .select('dept_id,'dept_nm,'emp_details,sumArray)
    

    결과:

    +-------+-------+--------------------+--------+
    |dept_id|dept_nm|         emp_details|sumArray|
    +-------+-------+--------------------+--------+
    |     10|Finance|[100, 200, 300, 4...|    1500|
    |     20|     IT|   [10, 20, 50, 100]|     180|
    +-------+-------+--------------------+--------+
    

    나는 그것이 무엇을하고 있는지 이해하는 sumArray보고 조언.

    편집 : 물론 난 단지 다시 질문의 절반을 읽기 당신이 항목이 합계를 변경하려는 경우 ...하지만, 당신은 (즉, 당신은 슬라이스 기능이 필요하지 않습니다)는이 솔루션을 분명하게 볼 수 있습니다 당신이 필요로하는 인덱스의 범위 단지 변화 (최대 길이까지 0)

    def sumArray(from: Int, max: Int) = (from until max)
          .map(i => when(size('emp_details) > i,'emp_details(i)).otherwise(lit(0)))
          .reduce(_ + _)
          .as("sumArray")
    
  5. ==============================

    5.RDD 방법이없는, 그래서 내가 그것을 추가 할 수 있습니다.

    RDD 방법이없는, 그래서 내가 그것을 추가 할 수 있습니다.

    val df = Seq((10, "Finance", Array(100,200,300,400,500)),(20, "IT", Array(10,20,50,100))).toDF("dept_id", "dept_nm","emp_details")
    
    import scala.collection.mutable._
    
    val rdd1 = df.rdd.map( x=> {val p = x.getAs[mutable.WrappedArray[Int]]("emp_details").toArray; Row.merge(x,Row(p.sum,p.slice(0,2).sum)) })
    
    spark.createDataFrame(rdd1,df.schema.add(StructField("sumArray",IntegerType)).add(StructField("sliceArray",IntegerType))).show(false)
    

    산출:

    +-------+-------+-------------------------+--------+----------+
    |dept_id|dept_nm|emp_details              |sumArray|sliceArray|
    +-------+-------+-------------------------+--------+----------+
    |10     |Finance|[100, 200, 300, 400, 500]|1500    |300       |
    |20     |IT     |[10, 20, 50, 100]        |180     |30        |
    +-------+-------+-------------------------+--------+----------+
    
  6. from https://stackoverflow.com/questions/40151064/how-to-slice-and-sum-elements-of-array-column by cc-by-sa and MIT license