[SCALA] 스파크 dataframe에서 두 행의 차이
SCALA스파크 dataframe에서 두 행의 차이
나는 GROUPBY의 컬럼 1과 날짜, 스파크에 dataframe를 생성 양을 계산 하였다.
val table = df1.groupBy($"column1",$"date").sum("amount")
Column1 |Date |Amount
A |1-jul |1000
A |1-june |2000
A |1-May |2000
A |1-dec |3000
A |1-Nov |2000
B |1-jul |100
B |1-june |300
B |1-May |400
B |1-dec |300
지금, 나는 테이블에서 두 날짜의 양의 차이와 함께, 새로운 열을 추가 할 수 있습니다.
해결법
-
==============================
1.계산은 당신이 창으로 지연 및 리드 기능을 사용할 수있는 경우 등 ... 이전 달 사이의 차이를 계산, 또는 이전 두 달 사이에 계산으로 고정되어있는 경우에는 윈도우 기능을 사용할 수 있습니다.
계산은 당신이 창으로 지연 및 리드 기능을 사용할 수있는 경우 등 ... 이전 달 사이의 차이를 계산, 또는 이전 두 달 사이에 계산으로 고정되어있는 경우에는 윈도우 기능을 사용할 수 있습니다.
그러나위한 당신이 아래 그래서이 주문할 수있는 날짜 열을 변경할 필요가있다.
+-------+------+--------------+------+ |Column1|Date |Date_Converted|Amount| +-------+------+--------------+------+ |A |1-jul |2017-07-01 |1000 | |A |1-june|2017-06-01 |2000 | |A |1-May |2017-05-01 |2000 | |A |1-dec |2017-12-01 |3000 | |A |1-Nov |2017-11-01 |2000 | |B |1-jul |2017-07-01 |100 | |B |1-june|2017-06-01 |300 | |B |1-May |2017-05-01 |400 | |B |1-dec |2017-12-01 |300 | +-------+------+--------------+------+
당신은 수행하여 지난 달과 이번 달의 차이를 찾을 수 있습니다
import org.apache.spark.sql.expressions._ val windowSpec = Window.partitionBy("Column1").orderBy("Date_Converted") import org.apache.spark.sql.functions._ df.withColumn("diff_Amt_With_Prev_Month", $"Amount" - when((lag("Amount", 1).over(windowSpec)).isNull, 0).otherwise(lag("Amount", 1).over(windowSpec))) .show(false)
당신은해야한다
+-------+------+--------------+------+------------------------+ |Column1|Date |Date_Converted|Amount|diff_Amt_With_Prev_Month| +-------+------+--------------+------+------------------------+ |B |1-May |2017-05-01 |400 |400.0 | |B |1-june|2017-06-01 |300 |-100.0 | |B |1-jul |2017-07-01 |100 |-200.0 | |B |1-dec |2017-12-01 |300 |200.0 | |A |1-May |2017-05-01 |2000 |2000.0 | |A |1-june|2017-06-01 |2000 |0.0 | |A |1-jul |2017-07-01 |1000 |-1000.0 | |A |1-Nov |2017-11-01 |2000 |1000.0 | |A |1-dec |2017-12-01 |3000 |1000.0 | +-------+------+--------------+------+------------------------+
당신은 이전 두 달 동안 지체 위치를 증가시킬 수있다
df.withColumn("diff_Amt_With_Prev_two_Month", $"Amount" - when((lag("Amount", 2).over(windowSpec)).isNull, 0).otherwise(lag("Amount", 2).over(windowSpec))) .show(false)
당신에게 줄 것이다
+-------+------+--------------+------+----------------------------+ |Column1|Date |Date_Converted|Amount|diff_Amt_With_Prev_two_Month| +-------+------+--------------+------+----------------------------+ |B |1-May |2017-05-01 |400 |400.0 | |B |1-june|2017-06-01 |300 |300.0 | |B |1-jul |2017-07-01 |100 |-300.0 | |B |1-dec |2017-12-01 |300 |0.0 | |A |1-May |2017-05-01 |2000 |2000.0 | |A |1-june|2017-06-01 |2000 |2000.0 | |A |1-jul |2017-07-01 |1000 |-1000.0 | |A |1-Nov |2017-11-01 |2000 |0.0 | |A |1-dec |2017-12-01 |3000 |2000.0 | +-------+------+--------------+------+----------------------------+
나는 대답은 도움이 희망
-
==============================
2.테이블의 각 그룹에 속하는 두 날짜를 가정
테이블의 각 그룹에 속하는 두 날짜를 가정
내 수입 :
import org.apache.spark.sql.functions.{concat_ws,collect_list,lit}
dataframe 전
scala> val seqRow = Seq( | ("A","1- jul",1000), | ("A","1-june",2000), | ("A","1-May",2000), | ("A","1-dec",3000), | ("B","1-jul",100), | ("B","1-june",300), | ("B","1-May",400), | ("B","1-dec",300)) seqRow: Seq[(String, String, Int)] = List((A,1- jul,1000), (A,1-june,2000), (A,1-May,2000), (A,1-dec,3000), (B,1-jul,100), (B,1-june,300), (B,1-May,400), (B,1-dec,300)) scala> val input_df = sc.parallelize(seqRow).toDF("column1","date","amount") input_df: org.apache.spark.sql.DataFrame = [column1: string, date: string ... 1 more field]
이제 당신의 사건에 대한 UDF를 작성,
scala> def calc_diff = udf((list : Seq[String],startMonth : String,endMonth : String) => { | //get the month and their values | val monthMap = list.map{str => | val splitText = str.split("\\$") | val month = splitText(0).split("-")(1).trim | | (month.toLowerCase,splitText(1).toInt) | }.toMap | | val stMnth = monthMap(startMonth) | val endMnth = monthMap(endMonth) | endMnth - stMnth | | }) calc_diff: org.apache.spark.sql.expressions.UserDefinedFunction
이제 출력을 준비
scala> val (month1 : String,month2 : String) = ("jul","dec") month1: String = jul month2: String = dec scala> val req_df = group_df.withColumn("diff",calc_diff('collect_val,lit(month1.toLowerCase),lit(month2.toLowerCase))) req_df: org.apache.spark.sql.DataFrame = [column1: string, sum_amount: bigint ... 2 more fields] scala> val req_df = group_df.withColumn("diff",calc_diff('collect_val,lit(month1.toLowerCase),lit(month2.toLowerCase))).drop('collect_val) req_df: org.apache.spark.sql.DataFrame = [column1: string, sum_amount: bigint ... 1 more field] scala> req_df.orderBy('column1).show +-------+----------+----+ |column1|sum_amount|diff| +-------+----------+----+ | A| 8000|2000| | B| 1100| 200| +-------+----------+----+
희망이 당신이 원하는합니다.
-
==============================
3.
(table.filter($"Date".isin("1-jul", "1-dec")) .groupBy("Column1") .pivot("Date") .agg(first($"Amount")) .withColumn("diff", $"1-dec" - $"1-jul") ).show +-------+-----+-----+----+ |Column1|1-dec|1-jul|diff| +-------+-----+-----+----+ | B| 300| 100| 200| | A| 3000| 1000|2000| +-------+-----+-----+----+
from https://stackoverflow.com/questions/45527208/difference-between-two-rows-in-spark-dataframe by cc-by-sa and MIT license
'SCALA' 카테고리의 다른 글
[SCALA] 스파크 날짜를 유닉스 타임 스탬프로 변환하는 방법 (0) | 2019.11.16 |
---|---|
[SCALA] 괄호없이 기능과의 차이 [중복] (0) | 2019.11.16 |
[SCALA] 생산에서 재생 서버를 시작 스칼라 (0) | 2019.11.16 |
[SCALA] 어떻게 스칼라에서 기능을 인수하는 방법을 조롱하는? (0) | 2019.11.16 |
[SCALA] 스칼라의 크로스 제품 (0) | 2019.11.16 |