스파크 dataframe에서 두 행의 차이

나는 GROUPBY의 컬럼 1과 날짜, 스파크에 dataframe를 생성 양을 계산 하였다.

val table = df1.groupBy($"column1",$"date").sum("amount")
Column1 |Date   |Amount
A   |1-jul  |1000
A   |1-june |2000
A   |1-May  |2000
A   |1-dec  |3000
A   |1-Nov  |2000
B   |1-jul  |100
B   |1-june |300    
B   |1-May  |400
B   |1-dec  |300

지금, 나는 테이블에서 두 날짜의 양의 차이와 함께, 새로운 열을 추가 할 수 있습니다.


    1.계산은 당신이 창으로 지연 및 리드 기능을 사용할 수있는 경우 등 ... 이전 달 사이의 차이를 계산, 또는 이전 두 달 사이에 계산으로 고정되어있는 경우에는 윈도우 기능을 사용할 수 있습니다.

    그러나위한 당신이 아래 그래서이 주문할 수있는 날짜 열을 변경할 필요가있다.

    |Column1|Date  |Date_Converted|Amount|
    |A      |1-jul |2017-07-01    |1000  |
    |A      |1-june|2017-06-01    |2000  |
    |A      |1-May |2017-05-01    |2000  |
    |A      |1-dec |2017-12-01    |3000  |
    |A      |1-Nov |2017-11-01    |2000  |
    |B      |1-jul |2017-07-01    |100   |
    |B      |1-june|2017-06-01    |300   |
    |B      |1-May |2017-05-01    |400   |
    |B      |1-dec |2017-12-01    |300   |

    당신은 수행하여 지난 달과 이번 달의 차이를 찾을 수 있습니다

    import org.apache.spark.sql.expressions._
    val windowSpec = Window.partitionBy("Column1").orderBy("Date_Converted")
    import org.apache.spark.sql.functions._
    df.withColumn("diff_Amt_With_Prev_Month", $"Amount" - when((lag("Amount", 1).over(windowSpec)).isNull, 0).otherwise(lag("Amount", 1).over(windowSpec)))


    |Column1|Date  |Date_Converted|Amount|diff_Amt_With_Prev_Month|
    |B      |1-May |2017-05-01    |400   |400.0                   |
    |B      |1-june|2017-06-01    |300   |-100.0                  |
    |B      |1-jul |2017-07-01    |100   |-200.0                  |
    |B      |1-dec |2017-12-01    |300   |200.0                   |
    |A      |1-May |2017-05-01    |2000  |2000.0                  |
    |A      |1-june|2017-06-01    |2000  |0.0                     |
    |A      |1-jul |2017-07-01    |1000  |-1000.0                 |
    |A      |1-Nov |2017-11-01    |2000  |1000.0                  |
    |A      |1-dec |2017-12-01    |3000  |1000.0                  |

    당신은 이전 두 달 동안 지체 위치를 증가시킬 수있다

    df.withColumn("diff_Amt_With_Prev_two_Month", $"Amount" - when((lag("Amount", 2).over(windowSpec)).isNull, 0).otherwise(lag("Amount", 2).over(windowSpec)))

    당신에게 줄 것이다

    |Column1|Date  |Date_Converted|Amount|diff_Amt_With_Prev_two_Month|
    |B      |1-May |2017-05-01    |400   |400.0                       |
    |B      |1-june|2017-06-01    |300   |300.0                       |
    |B      |1-jul |2017-07-01    |100   |-300.0                      |
    |B      |1-dec |2017-12-01    |300   |0.0                         |
    |A      |1-May |2017-05-01    |2000  |2000.0                      |
    |A      |1-june|2017-06-01    |2000  |2000.0                      |
    |A      |1-jul |2017-07-01    |1000  |-1000.0                     |
    |A      |1-Nov |2017-11-01    |2000  |0.0                         |
    |A      |1-dec |2017-12-01    |3000  |2000.0                      |

    나는 대답은 도움이 희망

    2.테이블의 각 그룹에 속하는 두 날짜를 가정

    내 수입 :

    import org.apache.spark.sql.functions.{concat_ws,collect_list,lit}

    dataframe 전

    scala> val seqRow = Seq(
     | ("A","1- jul",1000),
     | ("A","1-june",2000),
     | ("A","1-May",2000),
     | ("A","1-dec",3000),
     | ("B","1-jul",100),
     | ("B","1-june",300),
     | ("B","1-May",400),
     | ("B","1-dec",300))
    seqRow: Seq[(String, String, Int)] = List((A,1- jul,1000), (A,1-june,2000), (A,1-May,2000), (A,1-dec,3000), (B,1-jul,100), (B,1-june,300), (B,1-May,400), (B,1-dec,300))
    scala> val input_df = sc.parallelize(seqRow).toDF("column1","date","amount")
    input_df: org.apache.spark.sql.DataFrame = [column1: string, date: string ... 1 more field]

    이제 당신의 사건에 대한 UDF를 작성,

    scala> def calc_diff = udf((list : Seq[String],startMonth : String,endMonth : String) => {
         |     //get the month and their values
         |     val monthMap = list.map{str =>
         |     val splitText = str.split("\\$")
         |     val month = splitText(0).split("-")(1).trim
         |         (month.toLowerCase,splitText(1).toInt)
         |     }.toMap
         |     val stMnth = monthMap(startMonth)
         |     val endMnth = monthMap(endMonth)
         |     endMnth - stMnth
         | })
    calc_diff: org.apache.spark.sql.expressions.UserDefinedFunction

    이제 출력을 준비

    scala> val (month1 : String,month2 : String) = ("jul","dec")
    month1: String = jul
    month2: String = dec
    scala> val req_df = group_df.withColumn("diff",calc_diff('collect_val,lit(month1.toLowerCase),lit(month2.toLowerCase)))
    req_df: org.apache.spark.sql.DataFrame = [column1: string, sum_amount: bigint ... 2 more fields]
    scala> val req_df = group_df.withColumn("diff",calc_diff('collect_val,lit(month1.toLowerCase),lit(month2.toLowerCase))).drop('collect_val)
    req_df: org.apache.spark.sql.DataFrame = [column1: string, sum_amount: bigint ... 1 more field]
    scala> req_df.orderBy('column1).show
    |      A|      8000|2000|
    |      B|      1100| 200|

    희망이 당신이 원하는합니다.

    (table.filter($"Date".isin("1-jul", "1-dec"))
          .withColumn("diff", $"1-dec" - $"1-jul")
    |      B|  300|  100| 200|
    |      A| 3000| 1000|2000|
