복붙노트

[SQL] 어떻게 그룹에 스파크 SQL에서 시간 간격으로

SQL

어떻게 그룹에 스파크 SQL에서 시간 간격으로

내 데이터 세트는 다음과 같습니다 :

KEY |Event_Type | metric | Time 
001 |event1     | 10     | 2016-05-01 10:50:51
002 |event2     | 100    | 2016-05-01 10:50:53
001 |event3     | 20     | 2016-05-01 10:50:55
001 |event1     | 15     | 2016-05-01 10:51:50
003 |event1     | 13     | 2016-05-01 10:55:30
001 |event2     | 12     | 2016-05-01 10:57:00
001 |event3     | 11     | 2016-05-01 11:00:01

나는 이것을 확인하는 것이 모든 경우 열쇠를 얻으려면 :

"특정 이벤트에 대한 통계의 SUM"> 오분 동안 임계 값.

이는 슬라이딩 윈도우 기능을 사용하여 나에게 완벽한 후보를 나타납니다.

어떻게 스파크 SQL이 할 수 있습니까?

감사합니다.

해결법

  1. ==============================

    1.스파크> = 2.0

    스파크> = 2.0

    당신은 창을 사용할 수 있습니다 (창 기능을 착각하지 말 것). 더 하나, 그것은 타임 스탬프를 할당하는 변형에 잠재적으로 중복 버킷을 따라 :

    df.groupBy($"KEY", window($"time", "5 minutes")).sum("metric")
    
    // +---+---------------------------------------------+-----------+
    // |KEY|window                                       |sum(metric)|
    // +---+---------------------------------------------+-----------+
    // |001|[2016-05-01 10:50:00.0,2016-05-01 10:55:00.0]|45         |
    // |001|[2016-05-01 10:55:00.0,2016-05-01 11:00:00.0]|12         |
    // |003|[2016-05-01 10:55:00.0,2016-05-01 11:00:00.0]|13         |
    // |001|[2016-05-01 11:00:00.0,2016-05-01 11:05:00.0]|11         |
    // |002|[2016-05-01 10:50:00.0,2016-05-01 10:55:00.0]|100        |
    // +---+---------------------------------------------+-----------+
    

    스파크 <2.0

    예를 들어 데이터를 시작할 수 있습니다 :

    import spark.implicits._  // import sqlContext.implicits._ in Spark < 2.0
    
    val df = Seq(
      ("001", "event1", 10, "2016-05-01 10:50:51"),
      ("002", "event2", 100, "2016-05-01 10:50:53"),
      ("001", "event3", 20, "2016-05-01 10:50:55"),
      ("001", "event1", 15, "2016-05-01 10:51:50"),
      ("003", "event1", 13, "2016-05-01 10:55:30"),
      ("001", "event2", 12, "2016-05-01 10:57:00"),
      ("001", "event3", 11, "2016-05-01 11:00:01")
    ).toDF("KEY", "Event_Type", "metric", "Time")
    

    그 이벤트가 KEY로 식별됩니다 가정합니다. 그렇지 않은 경우는 GROUP BY를 조정할 수 있습니다 / 파티션을 BY 절 요구 사항에 따라.

    당신은 데이터의 정적 창 독립과 통합에 관심이 있다면 숫자 데이터 유형 및 라운드에 타임 스탬프로 변환

    import org.apache.spark.sql.functions.{round, sum}
    
    // cast string to timestamp
    val ts = $"Time".cast("timestamp").cast("long")
    
    // Round to 300 seconds interval
    val interval = (round(ts / 300L) * 300.0).cast("timestamp").alias("interval")
    
    df.groupBy($"KEY", interval).sum("metric")
    
    // +---+---------------------+-----------+
    // |KEY|interval             |sum(metric)|
    // +---+---------------------+-----------+
    // |001|2016-05-01 11:00:00.0|11         |
    // |001|2016-05-01 10:55:00.0|12         |
    // |001|2016-05-01 10:50:00.0|45         |
    // |003|2016-05-01 10:55:00.0|13         |
    // |002|2016-05-01 10:50:00.0|100        |
    // +---+---------------------+-----------+
    

    당신은 현재 행 사용 윈도우 기능 윈도우 상대에 관심이 있다면 :

    import org.apache.spark.sql.expressions.Window
    
    // Partition by KEY
    // Order by timestamp 
    // Consider window of -150 seconds to + 150 seconds relative to the current row
    val w = Window.partitionBy($"KEY").orderBy("ts").rangeBetween(-150, 150)
    df.withColumn("ts", ts).withColumn("window_sum", sum($"metric").over(w))
    
    // +---+----------+------+-------------------+----------+----------+
    // |KEY|Event_Type|metric|Time               |ts        |window_sum|
    // +---+----------+------+-------------------+----------+----------+
    // |003|event1    |13    |2016-05-01 10:55:30|1462092930|13        |
    // |001|event1    |10    |2016-05-01 10:50:51|1462092651|45        |
    // |001|event3    |20    |2016-05-01 10:50:55|1462092655|45        |
    // |001|event1    |15    |2016-05-01 10:51:50|1462092710|45        |
    // |001|event2    |12    |2016-05-01 10:57:00|1462093020|12        |
    // |001|event3    |11    |2016-05-01 11:00:01|1462093201|11        |
    // |002|event2    |100   |2016-05-01 10:50:53|1462092653|100       |
    // +---+----------+------+-------------------+----------+----------+
    

    성능상의 이유로이 방법은 데이터가 여러 별도의 그룹으로 분할 할 수있는 경우에만 유용합니다. 스파크 <2.0.0에서 당신은 또한 작업 할 HiveContext이 필요합니다.

  2. ==============================

    2.정적 경계를 위해 당신은 다음을 수행 할 수

    정적 경계를 위해 당신은 다음을 수행 할 수

    1) 변환 (지도 mapPartitions 등) 시간 값 형태 YYYY-MM-DD-HH-mm mm 5 분간 수준에서 올리고있다. 예를 들면 01, 02, 03, 05 05가된다; 16,17,18,19,20 20된다

    2) EVENT_TYPE와 시간 GROUPBY 또는 reduceBy을 수행하고 통계에 집계 (합계)를 수행

    3) 필터 측정 항목> (5)에 필터 변환을 수행

    당신은 거의 같은 방식으로 스파크 RDD 또는 dataframe (SQL)에 이상 쓸 수 있습니다.

    경계 00-05, 01-06, 02-07의 다른 유형의 당신은 창을 슬라이딩의 개념으로보고 시도해야합니다. 데이터 섭취 유스 케이스 맞는 패턴을 스트리밍 경우이 같은 사용자 지정 솔루션을 찾을 수 있습니다, 그렇지 않으면 완벽 할 것입니다 API를 스트리밍 스파크 : 아파치 스파크 - 임시 RDDs에서 Windows를 슬라이딩 다루기

  3. from https://stackoverflow.com/questions/37632238/how-to-group-by-time-interval-in-spark-sql by cc-by-sa and MIT license