[SQL] 복잡한 조건 스파크 SQL 창 기능
SQL복잡한 조건 스파크 SQL 창 기능
이 예를 통해 설명 할 아마도 가장 쉬운 방법입니다. 내가 예를 들어 웹 사이트에 사용자 로그인의 DataFrame가 있다고 가정 :
scala> df.show(5)
+----------------+----------+
| user_name|login_date|
+----------------+----------+
|SirChillingtonIV|2012-01-04|
|Booooooo99900098|2012-01-04|
|Booooooo99900098|2012-01-06|
| OprahWinfreyJr|2012-01-10|
|SirChillingtonIV|2012-01-11|
+----------------+----------+
only showing top 5 rows
나는 그들이 사이트에 활성 사용자가되었을 때 나타내는 이것을 컬럼에 추가하고 싶습니다. 그러나 하나주의해야 할 점은있다 : 그들은 다시 자신의 became_active 날짜 재설정에 로그인하는 경우,이 사용자가 활성 상태로 간주하고있는 시간이며,이 기간 이후에. 이 기간 5 일 가정하자. 그런 다음 위의 표에서 원하는 테이블은 다음과 같이 될 것이다 :
+----------------+----------+-------------+
| user_name|login_date|became_active|
+----------------+----------+-------------+
|SirChillingtonIV|2012-01-04| 2012-01-04|
|Booooooo99900098|2012-01-04| 2012-01-04|
|Booooooo99900098|2012-01-06| 2012-01-04|
| OprahWinfreyJr|2012-01-10| 2012-01-10|
|SirChillingtonIV|2012-01-11| 2012-01-11|
+----------------+----------+-------------+
그래서 두 번째 로그인이 왔기 때문에 유효 기간이 만료 된 후, 특히, SirChillingtonIV의 became_active 날짜는 리셋 이었지만, 활성 기간 내에 하락하기 때문에 Booooooo99900098의 became_active 날짜, 그 / 그녀가 로그인 한 두 번째 시간을 다시 설정되지 않았습니다.
내 최초의 생각은 became_active 열을 채우기 위해 느껴지 값을 사용하여 다음 지연과 윈도우 함수를 사용하는 것이 었습니다; 예를 들어, 무언가가 대략 같은 시작 :
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
val window = Window.partitionBy("user_name").orderBy("login_date")
val df2 = df.withColumn("tmp", lag("login_date", 1).over(window))
(이제까지 로그인 처음 인 경우 즉,) 또는 TMP가 null 인 경우, 규칙이 될 것 became_active 날짜를 기입하기 경우 login_date - TMP> = 5 다음 became_active = login_date; 그렇지 않으면, TMP의 다음 가장 최근 값으로 이동하여 동일한 규칙을 적용합니다. 이것은 내가 문제가 구현하는 방법을 상상하는 데 문제가 재귀 방식을 제안한다.
내 질문 :이 가능한 방법인가, 그렇다면 나는 중지 어디를 찾을 때까지, 내가 어떻게 "위로"및 TMP 이전 값을 볼 수있다? 나는 반복 처리 스파크 SQL 열 값을 통해, 내 지식, 수 없습니다. 이 결과를 달성하기위한 또 다른 방법이 있습니까?
해결법
-
==============================
1.여기에 트릭입니다. 기능에 무리를 가져 오기 :
여기에 트릭입니다. 기능에 무리를 가져 오기 :
import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions.{coalesce, datediff, lag, lit, min, sum}
창을 정의합니다 :
val userWindow = Window.partitionBy("user_name").orderBy("login_date") val userSessionWindow = Window.partitionBy("user_name", "session")
새로운 세션이 시작되는 지점을 찾기 :
val newSession = (coalesce( datediff($"login_date", lag($"login_date", 1).over(userWindow)), lit(0) ) > 5).cast("bigint") val sessionized = df.withColumn("session", sum(newSession).over(userWindow))
세션 당 가장 빠른 날짜 찾기 :
val result = sessionized .withColumn("became_active", min($"login_date").over(userSessionWindow)) .drop("session")
데이터 세트로 정의 :
val df = Seq( ("SirChillingtonIV", "2012-01-04"), ("Booooooo99900098", "2012-01-04"), ("Booooooo99900098", "2012-01-06"), ("OprahWinfreyJr", "2012-01-10"), ("SirChillingtonIV", "2012-01-11"), ("SirChillingtonIV", "2012-01-14"), ("SirChillingtonIV", "2012-08-11") ).toDF("user_name", "login_date")
결과는 다음과 같습니다
+----------------+----------+-------------+ | user_name|login_date|became_active| +----------------+----------+-------------+ | OprahWinfreyJr|2012-01-10| 2012-01-10| |SirChillingtonIV|2012-01-04| 2012-01-04| <- The first session for user |SirChillingtonIV|2012-01-11| 2012-01-11| <- The second session for user |SirChillingtonIV|2012-01-14| 2012-01-11| |SirChillingtonIV|2012-08-11| 2012-08-11| <- The third session for user |Booooooo99900098|2012-01-04| 2012-01-04| |Booooooo99900098|2012-01-06| 2012-01-04| +----------------+----------+-------------+
-
==============================
2.Pyspark 작업은 다른 대답을 리팩토링
Pyspark 작업은 다른 대답을 리팩토링
Pyspark에서는 아래와 같이 할 수 있습니다.
데이터 프레임을 만들고
df = sqlContext.createDataFrame( [ ("SirChillingtonIV", "2012-01-04"), ("Booooooo99900098", "2012-01-04"), ("Booooooo99900098", "2012-01-06"), ("OprahWinfreyJr", "2012-01-10"), ("SirChillingtonIV", "2012-01-11"), ("SirChillingtonIV", "2012-01-14"), ("SirChillingtonIV", "2012-08-11") ], ("user_name", "login_date"))
상기 코드는 다음과 같은 데이터 프레임을 생성
+----------------+----------+ | user_name|login_date| +----------------+----------+ |SirChillingtonIV|2012-01-04| |Booooooo99900098|2012-01-04| |Booooooo99900098|2012-01-06| | OprahWinfreyJr|2012-01-10| |SirChillingtonIV|2012-01-11| |SirChillingtonIV|2012-01-14| |SirChillingtonIV|2012-08-11| +----------------+----------+
이제 우리는 처음 login_date의 차이가 5 일 이상 밖으로 찾고 싶어요.
이를 위해 아래와 같이 할.
필요한 수입
from pyspark.sql import functions as f from pyspark.sql import Window # defining window partitions login_window = Window.partitionBy("user_name").orderBy("login_date") session_window = Window.partitionBy("user_name", "session") session_df = df.withColumn("session", f.sum((f.coalesce(f.datediff("login_date", f.lag("login_date", 1).over(login_window)), f.lit(0)) > 5).cast("int")).over(login_window))
date_diff 경우 우리는 위의 코드를 실행하면 NULL 다음 병합 기능은 0으로 NULL을 대체합니다.
+----------------+----------+-------+ | user_name|login_date|session| +----------------+----------+-------+ | OprahWinfreyJr|2012-01-10| 0| |SirChillingtonIV|2012-01-04| 0| |SirChillingtonIV|2012-01-11| 1| |SirChillingtonIV|2012-01-14| 1| |SirChillingtonIV|2012-08-11| 2| |Booooooo99900098|2012-01-04| 0| |Booooooo99900098|2012-01-06| 0| +----------------+----------+-------+ # add became_active column by finding the `min login_date` for each window partitionBy `user_name` and `session` created in above step final_df = session_df.withColumn("became_active", f.min("login_date").over(session_window)).drop("session") +----------------+----------+-------------+ | user_name|login_date|became_active| +----------------+----------+-------------+ | OprahWinfreyJr|2012-01-10| 2012-01-10| |SirChillingtonIV|2012-01-04| 2012-01-04| |SirChillingtonIV|2012-01-11| 2012-01-11| |SirChillingtonIV|2012-01-14| 2012-01-11| |SirChillingtonIV|2012-08-11| 2012-08-11| |Booooooo99900098|2012-01-04| 2012-01-04| |Booooooo99900098|2012-01-06| 2012-01-04| +----------------+----------+-------------+
from https://stackoverflow.com/questions/42448564/spark-sql-window-function-with-complex-condition by cc-by-sa and MIT license
'SQL' 카테고리의 다른 글
[SQL] SQL에서 증가 날짜의 결과 세트를 생성 (0) | 2020.03.26 |
---|---|
[SQL] 어떻게 동적 SQL 문에서 테이블 변수를 사용하는 방법? (0) | 2020.03.26 |
[SQL] 더 집계 함수는 SELECT 절에 존재하지 않는 GROUP BY 동작 (0) | 2020.03.25 |
[SQL] 절은 SQL에 문제가 어디에 순서합니까? (0) | 2020.03.25 |
[SQL] SQL 쿼리 - UNION에서 주문을 사용하여 (0) | 2020.03.25 |