복붙노트

[SQL] 불꽃의 그룹과 시간 창을 압연을 통해 집계하는 방법

SQL

불꽃의 그룹과 시간 창을 압연을 통해 집계하는 방법

내가 특정 열을 기준으로 그룹화 할 몇 가지 데이터가 다음 그룹에서 롤링 시간 창에 따라 필드 시리즈를 집계.

다음은 몇 가지 예를 들어 데이터입니다 :

df = spark.createDataFrame([Row(date='2016-01-01', group_by='group1', get_avg=5, get_first=1),
                            Row(date='2016-01-10', group_by='group1', get_avg=5, get_first=2),
                            Row(date='2016-02-01', group_by='group2', get_avg=10, get_first=3),
                            Row(date='2016-02-28', group_by='group2', get_avg=20, get_first=3),
                            Row(date='2016-02-29', group_by='group2', get_avg=30, get_first=3),
                            Row(date='2016-04-02', group_by='group2', get_avg=8, get_first=4)])

나는 다음 가장 빠른 날짜에 시작 시간 창을 만들고 그 그룹에 대한 항목과 30 일이 될 때까지 확장 GROUP_BY에 의해 그룹으로합니다. 그 30 일 동안 후, 다음에 창은 이전 창에 해당되지 않은 다음 행의 날짜를 시작합니다.

그때 get_avg의 평균을 받고 예를 들어, 집계 할, 그리고 get_first의 첫 번째 결과.

그래서 예를 들어 출력이되어야합니다 :

group_by    first date of window    get_avg  get_first
group1      2016-01-01              5        1
group2      2016-02-01              20       3
group2      2016-04-02              8        4

편집 : 미안 해요 내 질문이 제대로 지정되지 않았습니다 깨달았다. 실제로 창을 원하는 활동의 종료 후 30 일이. 그에 따라 I 예의 그룹 2 부분을 수정했다.

해결법

  1. ==============================

    1.개정 대답 :

    개정 대답 :

    여기 간단한 윈도우 함수의 트릭을 사용할 수 있습니다. 수입의 무리 :

    from pyspark.sql.functions import coalesce, col, datediff, lag, lit, sum as sum_
    from pyspark.sql.window import Window
    

    창 정의 :

    w = Window.partitionBy("group_by").orderBy("date")
    

    DateType에 날짜를 캐스트 :

    df_ = df.withColumn("date", col("date").cast("date"))
    

    다음 식을 정의합니다 :

    # Difference from the previous record or 0 if this is the first one
    diff = coalesce(datediff("date", lag("date", 1).over(w)), lit(0))
    
    # 0 if diff <= 30, 1 otherwise
    indicator = (diff > 30).cast("integer")
    
    # Cumulative sum of indicators over the window
    subgroup = sum_(indicator).over(w).alias("subgroup")
    

    테이블에 하위 그룹 식을 추가 :

    df_.select("*", subgroup).groupBy("group_by", "subgroup").avg("get_avg")
    
    +--------+--------+------------+
    |group_by|subgroup|avg(get_avg)|
    +--------+--------+------------+
    |  group1|       0|         5.0|
    |  group2|       0|        20.0|
    |  group2|       1|         8.0|
    +--------+--------+------------+
    

    첫 번째는 집계와 의미가 아니라, 열이 일정하게 증가하는 경우 당신은 분을 사용할 수 있습니다. 그렇지 않으면 당신은뿐만 아니라 윈도우 함수를 사용해야합니다.

    스파크 2.1를 사용하여 테스트. 이전 스파크 출시와 함께 사용할 때 하위 쿼리 및 윈도우 인스턴스를 필요로 할 수있다.

    원래 대답 (지정된 범위에 해당되지 않음)

    스파크 2.0 이후 당신은 윈도우 기능을 사용할 수 있어야합니다 :

    from pyspark.sql.functions import window
    
    df.groupBy(window("date", windowDuration="30 days")).count()
    

    하지만, 결과에서 볼 수

    +---------------------------------------------+-----+
    |window                                       |count|
    +---------------------------------------------+-----+
    |[2016-01-30 01:00:00.0,2016-02-29 01:00:00.0]|1    |
    |[2015-12-31 01:00:00.0,2016-01-30 01:00:00.0]|2    |
    |[2016-03-30 02:00:00.0,2016-04-29 02:00:00.0]|1    |
    +---------------------------------------------+-----+
    

    이 시간대에 올 때 당신은 조금 조심해야합니다.

  2. from https://stackoverflow.com/questions/41711716/how-to-aggregate-over-rolling-time-window-with-groups-in-spark by cc-by-sa and MIT license