복붙노트

[SCALA] 스파크 dataframe에서 두 행의 차이

SCALA

스파크 dataframe에서 두 행의 차이

나는 GROUPBY의 컬럼 1과 날짜, 스파크에 dataframe를 생성 양을 계산 하였다.

val table = df1.groupBy($"column1",$"date").sum("amount")
Column1 |Date   |Amount
A   |1-jul  |1000
A   |1-june |2000
A   |1-May  |2000
A   |1-dec  |3000
A   |1-Nov  |2000
B   |1-jul  |100
B   |1-june |300    
B   |1-May  |400
B   |1-dec  |300

지금, 나는 테이블에서 두 날짜의 양의 차이와 함께, 새로운 열을 추가 할 수 있습니다.

해결법

  1. ==============================

    1.계산은 당신이 창으로 지연 및 리드 기능을 사용할 수있는 경우 등 ... 이전 달 사이의 차이를 계산, 또는 이전 두 달 사이에 계산으로 고정되어있는 경우에는 윈도우 기능을 사용할 수 있습니다.

    계산은 당신이 창으로 지연 및 리드 기능을 사용할 수있는 경우 등 ... 이전 달 사이의 차이를 계산, 또는 이전 두 달 사이에 계산으로 고정되어있는 경우에는 윈도우 기능을 사용할 수 있습니다.

    그러나위한 당신이 아래 그래서이 주문할 수있는 날짜 열을 변경할 필요가있다.

    +-------+------+--------------+------+
    |Column1|Date  |Date_Converted|Amount|
    +-------+------+--------------+------+
    |A      |1-jul |2017-07-01    |1000  |
    |A      |1-june|2017-06-01    |2000  |
    |A      |1-May |2017-05-01    |2000  |
    |A      |1-dec |2017-12-01    |3000  |
    |A      |1-Nov |2017-11-01    |2000  |
    |B      |1-jul |2017-07-01    |100   |
    |B      |1-june|2017-06-01    |300   |
    |B      |1-May |2017-05-01    |400   |
    |B      |1-dec |2017-12-01    |300   |
    +-------+------+--------------+------+
    

    당신은 수행하여 지난 달과 이번 달의 차이를 찾을 수 있습니다

    import org.apache.spark.sql.expressions._
    val windowSpec = Window.partitionBy("Column1").orderBy("Date_Converted")
    import org.apache.spark.sql.functions._
    df.withColumn("diff_Amt_With_Prev_Month", $"Amount" - when((lag("Amount", 1).over(windowSpec)).isNull, 0).otherwise(lag("Amount", 1).over(windowSpec)))
       .show(false)
    

    당신은해야한다

    +-------+------+--------------+------+------------------------+
    |Column1|Date  |Date_Converted|Amount|diff_Amt_With_Prev_Month|
    +-------+------+--------------+------+------------------------+
    |B      |1-May |2017-05-01    |400   |400.0                   |
    |B      |1-june|2017-06-01    |300   |-100.0                  |
    |B      |1-jul |2017-07-01    |100   |-200.0                  |
    |B      |1-dec |2017-12-01    |300   |200.0                   |
    |A      |1-May |2017-05-01    |2000  |2000.0                  |
    |A      |1-june|2017-06-01    |2000  |0.0                     |
    |A      |1-jul |2017-07-01    |1000  |-1000.0                 |
    |A      |1-Nov |2017-11-01    |2000  |1000.0                  |
    |A      |1-dec |2017-12-01    |3000  |1000.0                  |
    +-------+------+--------------+------+------------------------+
    

    당신은 이전 두 달 동안 지체 위치를 증가시킬 수있다

    df.withColumn("diff_Amt_With_Prev_two_Month", $"Amount" - when((lag("Amount", 2).over(windowSpec)).isNull, 0).otherwise(lag("Amount", 2).over(windowSpec)))
      .show(false)
    

    당신에게 줄 것이다

    +-------+------+--------------+------+----------------------------+
    |Column1|Date  |Date_Converted|Amount|diff_Amt_With_Prev_two_Month|
    +-------+------+--------------+------+----------------------------+
    |B      |1-May |2017-05-01    |400   |400.0                       |
    |B      |1-june|2017-06-01    |300   |300.0                       |
    |B      |1-jul |2017-07-01    |100   |-300.0                      |
    |B      |1-dec |2017-12-01    |300   |0.0                         |
    |A      |1-May |2017-05-01    |2000  |2000.0                      |
    |A      |1-june|2017-06-01    |2000  |2000.0                      |
    |A      |1-jul |2017-07-01    |1000  |-1000.0                     |
    |A      |1-Nov |2017-11-01    |2000  |0.0                         |
    |A      |1-dec |2017-12-01    |3000  |2000.0                      |
    +-------+------+--------------+------+----------------------------+
    

    나는 대답은 도움이 희망

  2. ==============================

    2.테이블의 각 그룹에 속하는 두 날짜를 가정

    테이블의 각 그룹에 속하는 두 날짜를 가정

    내 수입 :

    import org.apache.spark.sql.functions.{concat_ws,collect_list,lit}
    

    dataframe 전

    scala> val seqRow = Seq(
     | ("A","1- jul",1000),
     | ("A","1-june",2000),
     | ("A","1-May",2000),
     | ("A","1-dec",3000),
     | ("B","1-jul",100),
     | ("B","1-june",300),
     | ("B","1-May",400),
     | ("B","1-dec",300))
    
    seqRow: Seq[(String, String, Int)] = List((A,1- jul,1000), (A,1-june,2000), (A,1-May,2000), (A,1-dec,3000), (B,1-jul,100), (B,1-june,300), (B,1-May,400), (B,1-dec,300))
    
    scala> val input_df = sc.parallelize(seqRow).toDF("column1","date","amount")
    input_df: org.apache.spark.sql.DataFrame = [column1: string, date: string ... 1 more field]
    

    이제 당신의 사건에 대한 UDF를 작성,

    scala> def calc_diff = udf((list : Seq[String],startMonth : String,endMonth : String) => {
         |     //get the month and their values
         |     val monthMap = list.map{str =>
         |     val splitText = str.split("\\$")
         |     val month = splitText(0).split("-")(1).trim
         |
         |         (month.toLowerCase,splitText(1).toInt)
         |     }.toMap
         |
         |     val stMnth = monthMap(startMonth)
         |     val endMnth = monthMap(endMonth)
         |     endMnth - stMnth
         |
         | })
    calc_diff: org.apache.spark.sql.expressions.UserDefinedFunction
    

    이제 출력을 준비

    scala> val (month1 : String,month2 : String) = ("jul","dec")
    month1: String = jul
    month2: String = dec
    
    scala> val req_df = group_df.withColumn("diff",calc_diff('collect_val,lit(month1.toLowerCase),lit(month2.toLowerCase)))
    req_df: org.apache.spark.sql.DataFrame = [column1: string, sum_amount: bigint ... 2 more fields]
    
    scala> val req_df = group_df.withColumn("diff",calc_diff('collect_val,lit(month1.toLowerCase),lit(month2.toLowerCase))).drop('collect_val)
    req_df: org.apache.spark.sql.DataFrame = [column1: string, sum_amount: bigint ... 1 more field]
    
    scala> req_df.orderBy('column1).show
    +-------+----------+----+
    |column1|sum_amount|diff|
    +-------+----------+----+
    |      A|      8000|2000|
    |      B|      1100| 200|
    +-------+----------+----+
    

    희망이 당신이 원하는합니다.

  3. ==============================

    3.

    (table.filter($"Date".isin("1-jul", "1-dec"))
          .groupBy("Column1")
          .pivot("Date")
          .agg(first($"Amount"))
          .withColumn("diff", $"1-dec" - $"1-jul")
    ).show
    +-------+-----+-----+----+
    |Column1|1-dec|1-jul|diff|
    +-------+-----+-----+----+
    |      B|  300|  100| 200|
    |      A| 3000| 1000|2000|
    +-------+-----+-----+----+
    
  4. from https://stackoverflow.com/questions/45527208/difference-between-two-rows-in-spark-dataframe by cc-by-sa and MIT license