[SQL] 어떻게 그룹에 스파크 SQL에서 시간 간격으로
SQL어떻게 그룹에 스파크 SQL에서 시간 간격으로
내 데이터 세트는 다음과 같습니다 :
KEY |Event_Type | metric | Time
001 |event1 | 10 | 2016-05-01 10:50:51
002 |event2 | 100 | 2016-05-01 10:50:53
001 |event3 | 20 | 2016-05-01 10:50:55
001 |event1 | 15 | 2016-05-01 10:51:50
003 |event1 | 13 | 2016-05-01 10:55:30
001 |event2 | 12 | 2016-05-01 10:57:00
001 |event3 | 11 | 2016-05-01 11:00:01
나는 이것을 확인하는 것이 모든 경우 열쇠를 얻으려면 :
"특정 이벤트에 대한 통계의 SUM"> 오분 동안 임계 값.
이는 슬라이딩 윈도우 기능을 사용하여 나에게 완벽한 후보를 나타납니다.
어떻게 스파크 SQL이 할 수 있습니까?
감사합니다.
해결법
-
==============================
1.스파크> = 2.0
스파크> = 2.0
당신은 창을 사용할 수 있습니다 (창 기능을 착각하지 말 것). 더 하나, 그것은 타임 스탬프를 할당하는 변형에 잠재적으로 중복 버킷을 따라 :
df.groupBy($"KEY", window($"time", "5 minutes")).sum("metric") // +---+---------------------------------------------+-----------+ // |KEY|window |sum(metric)| // +---+---------------------------------------------+-----------+ // |001|[2016-05-01 10:50:00.0,2016-05-01 10:55:00.0]|45 | // |001|[2016-05-01 10:55:00.0,2016-05-01 11:00:00.0]|12 | // |003|[2016-05-01 10:55:00.0,2016-05-01 11:00:00.0]|13 | // |001|[2016-05-01 11:00:00.0,2016-05-01 11:05:00.0]|11 | // |002|[2016-05-01 10:50:00.0,2016-05-01 10:55:00.0]|100 | // +---+---------------------------------------------+-----------+
스파크 <2.0
예를 들어 데이터를 시작할 수 있습니다 :
import spark.implicits._ // import sqlContext.implicits._ in Spark < 2.0 val df = Seq( ("001", "event1", 10, "2016-05-01 10:50:51"), ("002", "event2", 100, "2016-05-01 10:50:53"), ("001", "event3", 20, "2016-05-01 10:50:55"), ("001", "event1", 15, "2016-05-01 10:51:50"), ("003", "event1", 13, "2016-05-01 10:55:30"), ("001", "event2", 12, "2016-05-01 10:57:00"), ("001", "event3", 11, "2016-05-01 11:00:01") ).toDF("KEY", "Event_Type", "metric", "Time")
그 이벤트가 KEY로 식별됩니다 가정합니다. 그렇지 않은 경우는 GROUP BY를 조정할 수 있습니다 / 파티션을 BY 절 요구 사항에 따라.
당신은 데이터의 정적 창 독립과 통합에 관심이 있다면 숫자 데이터 유형 및 라운드에 타임 스탬프로 변환
import org.apache.spark.sql.functions.{round, sum} // cast string to timestamp val ts = $"Time".cast("timestamp").cast("long") // Round to 300 seconds interval val interval = (round(ts / 300L) * 300.0).cast("timestamp").alias("interval") df.groupBy($"KEY", interval).sum("metric") // +---+---------------------+-----------+ // |KEY|interval |sum(metric)| // +---+---------------------+-----------+ // |001|2016-05-01 11:00:00.0|11 | // |001|2016-05-01 10:55:00.0|12 | // |001|2016-05-01 10:50:00.0|45 | // |003|2016-05-01 10:55:00.0|13 | // |002|2016-05-01 10:50:00.0|100 | // +---+---------------------+-----------+
당신은 현재 행 사용 윈도우 기능 윈도우 상대에 관심이 있다면 :
import org.apache.spark.sql.expressions.Window // Partition by KEY // Order by timestamp // Consider window of -150 seconds to + 150 seconds relative to the current row val w = Window.partitionBy($"KEY").orderBy("ts").rangeBetween(-150, 150) df.withColumn("ts", ts).withColumn("window_sum", sum($"metric").over(w)) // +---+----------+------+-------------------+----------+----------+ // |KEY|Event_Type|metric|Time |ts |window_sum| // +---+----------+------+-------------------+----------+----------+ // |003|event1 |13 |2016-05-01 10:55:30|1462092930|13 | // |001|event1 |10 |2016-05-01 10:50:51|1462092651|45 | // |001|event3 |20 |2016-05-01 10:50:55|1462092655|45 | // |001|event1 |15 |2016-05-01 10:51:50|1462092710|45 | // |001|event2 |12 |2016-05-01 10:57:00|1462093020|12 | // |001|event3 |11 |2016-05-01 11:00:01|1462093201|11 | // |002|event2 |100 |2016-05-01 10:50:53|1462092653|100 | // +---+----------+------+-------------------+----------+----------+
성능상의 이유로이 방법은 데이터가 여러 별도의 그룹으로 분할 할 수있는 경우에만 유용합니다. 스파크 <2.0.0에서 당신은 또한 작업 할 HiveContext이 필요합니다.
-
==============================
2.정적 경계를 위해 당신은 다음을 수행 할 수
정적 경계를 위해 당신은 다음을 수행 할 수
1) 변환 (지도 mapPartitions 등) 시간 값 형태 YYYY-MM-DD-HH-mm mm 5 분간 수준에서 올리고있다. 예를 들면 01, 02, 03, 05 05가된다; 16,17,18,19,20 20된다
2) EVENT_TYPE와 시간 GROUPBY 또는 reduceBy을 수행하고 통계에 집계 (합계)를 수행
3) 필터 측정 항목> (5)에 필터 변환을 수행
당신은 거의 같은 방식으로 스파크 RDD 또는 dataframe (SQL)에 이상 쓸 수 있습니다.
경계 00-05, 01-06, 02-07의 다른 유형의 당신은 창을 슬라이딩의 개념으로보고 시도해야합니다. 데이터 섭취 유스 케이스 맞는 패턴을 스트리밍 경우이 같은 사용자 지정 솔루션을 찾을 수 있습니다, 그렇지 않으면 완벽 할 것입니다 API를 스트리밍 스파크 : 아파치 스파크 - 임시 RDDs에서 Windows를 슬라이딩 다루기
from https://stackoverflow.com/questions/37632238/how-to-group-by-time-interval-in-spark-sql by cc-by-sa and MIT license
'SQL' 카테고리의 다른 글
[SQL] 비트 필드에 MIN 집계 함수를 적용 (0) | 2020.06.26 |
---|---|
[SQL] 어떻게 PostgreSQL의 쿼리 문 여러 사용 하는가? (0) | 2020.06.26 |
[SQL] SQL : 적어도 N 시간에 발생하는 열 값을 가진 행을 선택? (0) | 2020.06.26 |
[SQL] 는 SQL 서버 데이터베이스 테이블이 개 중복 업데이트 한 (0) | 2020.06.26 |
[SQL] 에 "연관 테이블"(대다 관계)에 대한 적절한 이름 기능 [폐쇄] (0) | 2020.06.26 |