[SQL] 불꽃의 그룹과 시간 창을 압연을 통해 집계하는 방법
SQL불꽃의 그룹과 시간 창을 압연을 통해 집계하는 방법
내가 특정 열을 기준으로 그룹화 할 몇 가지 데이터가 다음 그룹에서 롤링 시간 창에 따라 필드 시리즈를 집계.
다음은 몇 가지 예를 들어 데이터입니다 :
df = spark.createDataFrame([Row(date='2016-01-01', group_by='group1', get_avg=5, get_first=1),
Row(date='2016-01-10', group_by='group1', get_avg=5, get_first=2),
Row(date='2016-02-01', group_by='group2', get_avg=10, get_first=3),
Row(date='2016-02-28', group_by='group2', get_avg=20, get_first=3),
Row(date='2016-02-29', group_by='group2', get_avg=30, get_first=3),
Row(date='2016-04-02', group_by='group2', get_avg=8, get_first=4)])
나는 다음 가장 빠른 날짜에 시작 시간 창을 만들고 그 그룹에 대한 항목과 30 일이 될 때까지 확장 GROUP_BY에 의해 그룹으로합니다. 그 30 일 동안 후, 다음에 창은 이전 창에 해당되지 않은 다음 행의 날짜를 시작합니다.
그때 get_avg의 평균을 받고 예를 들어, 집계 할, 그리고 get_first의 첫 번째 결과.
그래서 예를 들어 출력이되어야합니다 :
group_by first date of window get_avg get_first
group1 2016-01-01 5 1
group2 2016-02-01 20 3
group2 2016-04-02 8 4
편집 : 미안 해요 내 질문이 제대로 지정되지 않았습니다 깨달았다. 실제로 창을 원하는 활동의 종료 후 30 일이. 그에 따라 I 예의 그룹 2 부분을 수정했다.
해결법
-
==============================
1.개정 대답 :
개정 대답 :
여기 간단한 윈도우 함수의 트릭을 사용할 수 있습니다. 수입의 무리 :
from pyspark.sql.functions import coalesce, col, datediff, lag, lit, sum as sum_ from pyspark.sql.window import Window
창 정의 :
w = Window.partitionBy("group_by").orderBy("date")
DateType에 날짜를 캐스트 :
df_ = df.withColumn("date", col("date").cast("date"))
다음 식을 정의합니다 :
# Difference from the previous record or 0 if this is the first one diff = coalesce(datediff("date", lag("date", 1).over(w)), lit(0)) # 0 if diff <= 30, 1 otherwise indicator = (diff > 30).cast("integer") # Cumulative sum of indicators over the window subgroup = sum_(indicator).over(w).alias("subgroup")
테이블에 하위 그룹 식을 추가 :
df_.select("*", subgroup).groupBy("group_by", "subgroup").avg("get_avg")
+--------+--------+------------+ |group_by|subgroup|avg(get_avg)| +--------+--------+------------+ | group1| 0| 5.0| | group2| 0| 20.0| | group2| 1| 8.0| +--------+--------+------------+
첫 번째는 집계와 의미가 아니라, 열이 일정하게 증가하는 경우 당신은 분을 사용할 수 있습니다. 그렇지 않으면 당신은뿐만 아니라 윈도우 함수를 사용해야합니다.
스파크 2.1를 사용하여 테스트. 이전 스파크 출시와 함께 사용할 때 하위 쿼리 및 윈도우 인스턴스를 필요로 할 수있다.
원래 대답 (지정된 범위에 해당되지 않음)
스파크 2.0 이후 당신은 윈도우 기능을 사용할 수 있어야합니다 :
from pyspark.sql.functions import window df.groupBy(window("date", windowDuration="30 days")).count()
하지만, 결과에서 볼 수
+---------------------------------------------+-----+ |window |count| +---------------------------------------------+-----+ |[2016-01-30 01:00:00.0,2016-02-29 01:00:00.0]|1 | |[2015-12-31 01:00:00.0,2016-01-30 01:00:00.0]|2 | |[2016-03-30 02:00:00.0,2016-04-29 02:00:00.0]|1 | +---------------------------------------------+-----+
이 시간대에 올 때 당신은 조금 조심해야합니다.
from https://stackoverflow.com/questions/41711716/how-to-aggregate-over-rolling-time-window-with-groups-in-spark by cc-by-sa and MIT license
'SQL' 카테고리의 다른 글
[SQL] 빠른 속도로 ADO.NET을위한 도서관은 .csv 파일에서 데이터베이스에 데이터를 삽입 일괄? (0) | 2020.06.30 |
---|---|
[SQL] 자동 증가는 자동 증가 필드없이 데이터베이스에 필드 (0) | 2020.06.30 |
[SQL] 요일과 같은 형식의 날짜 (0) | 2020.06.30 |
[SQL] 테이블에서 '다음'과 '이전'에 대한 SQL은 무엇인가? (0) | 2020.06.30 |
[SQL] 데이터베이스 디자인 질문 - 카테고리 / 하위 카테고리 (0) | 2020.06.30 |