복붙노트

[SQL] 복잡한 조건 스파크 SQL 창 기능

SQL

복잡한 조건 스파크 SQL 창 기능

이 예를 통해 설명 할 아마도 가장 쉬운 방법입니다. 내가 예를 들어 웹 사이트에 사용자 로그인의 DataFrame가 있다고 가정 :

scala> df.show(5)
+----------------+----------+
|       user_name|login_date|
+----------------+----------+
|SirChillingtonIV|2012-01-04|
|Booooooo99900098|2012-01-04|
|Booooooo99900098|2012-01-06|
|  OprahWinfreyJr|2012-01-10|
|SirChillingtonIV|2012-01-11|
+----------------+----------+
only showing top 5 rows

나는 그들이 사이트에 활성 사용자가되었을 때 나타내는 이것을 컬럼에 추가하고 싶습니다. 그러나 하나주의해야 할 점은있다 : 그들은 다시 자신의 became_active 날짜 재설정에 로그인하는 경우,이 사용자가 활성 상태로 간주하고있는 시간이며,이 기간 이후에. 이 기간 5 일 가정하자. 그런 다음 위의 표에서 원하는 테이블은 다음과 같이 될 것이다 :

+----------------+----------+-------------+
|       user_name|login_date|became_active|
+----------------+----------+-------------+
|SirChillingtonIV|2012-01-04|   2012-01-04|
|Booooooo99900098|2012-01-04|   2012-01-04|
|Booooooo99900098|2012-01-06|   2012-01-04|
|  OprahWinfreyJr|2012-01-10|   2012-01-10|
|SirChillingtonIV|2012-01-11|   2012-01-11|
+----------------+----------+-------------+

그래서 두 번째 로그인이 왔기 때문에 유효 기간이 만료 된 후, 특히, SirChillingtonIV의 became_active 날짜는 리셋 이었지만, 활성 기간 내에 하락하기 때문에 Booooooo99900098의 became_active 날짜, 그 / 그녀가 로그인 한 두 번째 시간을 다시 설정되지 않았습니다.

내 최초의 생각은 became_active 열을 채우기 위해 느껴지 값을 사용하여 다음 지연과 윈도우 함수를 사용하는 것이 었습니다; 예를 들어, 무언가가 대략 같은 시작 :

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

val window = Window.partitionBy("user_name").orderBy("login_date")
val df2 = df.withColumn("tmp", lag("login_date", 1).over(window))

(이제까지 로그인 처음 인 경우 즉,) 또는 TMP가 null 인 경우, 규칙이 될 것 became_active 날짜를 기입하기 경우 login_date - TMP> = 5 다음 became_active = login_date; 그렇지 않으면, TMP의 다음 가장 최근 값으로 이동하여 동일한 규칙을 적용합니다. 이것은 내가 문제가 구현하는 방법을 상상하는 데 문제가 재귀 방식을 제안한다.

내 질문 :이 가능한 방법인가, 그렇다면 나는 중지 어디를 찾을 때까지, 내가 어떻게 "위로"및 TMP 이전 값을 볼 수있다? 나는 반복 처리 스파크 SQL 열 값을 통해, 내 지식, 수 없습니다. 이 결과를 달성하기위한 또 다른 방법이 있습니까?

해결법

  1. ==============================

    1.여기에 트릭입니다. 기능에 무리를 가져 오기 :

    여기에 트릭입니다. 기능에 무리를 가져 오기 :

    import org.apache.spark.sql.expressions.Window
    import org.apache.spark.sql.functions.{coalesce, datediff, lag, lit, min, sum}
    

    창을 정의합니다 :

    val userWindow = Window.partitionBy("user_name").orderBy("login_date")
    val userSessionWindow = Window.partitionBy("user_name", "session")
    

    새로운 세션이 시작되는 지점을 찾기 :

    val newSession =  (coalesce(
      datediff($"login_date", lag($"login_date", 1).over(userWindow)),
      lit(0)
    ) > 5).cast("bigint")
    
    val sessionized = df.withColumn("session", sum(newSession).over(userWindow))
    

    세션 당 가장 빠른 날짜 찾기 :

    val result = sessionized
      .withColumn("became_active", min($"login_date").over(userSessionWindow))
      .drop("session")
    

    데이터 세트로 정의 :

    val df = Seq(
      ("SirChillingtonIV", "2012-01-04"), ("Booooooo99900098", "2012-01-04"),
      ("Booooooo99900098", "2012-01-06"), ("OprahWinfreyJr", "2012-01-10"), 
      ("SirChillingtonIV", "2012-01-11"), ("SirChillingtonIV", "2012-01-14"),
      ("SirChillingtonIV", "2012-08-11")
    ).toDF("user_name", "login_date")
    

    결과는 다음과 같습니다

    +----------------+----------+-------------+
    |       user_name|login_date|became_active|
    +----------------+----------+-------------+
    |  OprahWinfreyJr|2012-01-10|   2012-01-10|
    |SirChillingtonIV|2012-01-04|   2012-01-04| <- The first session for user
    |SirChillingtonIV|2012-01-11|   2012-01-11| <- The second session for user
    |SirChillingtonIV|2012-01-14|   2012-01-11| 
    |SirChillingtonIV|2012-08-11|   2012-08-11| <- The third session for user
    |Booooooo99900098|2012-01-04|   2012-01-04|
    |Booooooo99900098|2012-01-06|   2012-01-04|
    +----------------+----------+-------------+
    
  2. ==============================

    2.Pyspark 작업은 다른 대답을 리팩토링

    Pyspark 작업은 다른 대답을 리팩토링

    Pyspark에서는 아래와 같이 할 수 있습니다.

    데이터 프레임을 만들고

    df = sqlContext.createDataFrame(
    [
    ("SirChillingtonIV", "2012-01-04"), 
    ("Booooooo99900098", "2012-01-04"), 
    ("Booooooo99900098", "2012-01-06"), 
    ("OprahWinfreyJr", "2012-01-10"), 
    ("SirChillingtonIV", "2012-01-11"), 
    ("SirChillingtonIV", "2012-01-14"), 
    ("SirChillingtonIV", "2012-08-11")
    ], 
    ("user_name", "login_date"))
    

    상기 코드는 다음과 같은 데이터 프레임을 생성

    +----------------+----------+
    |       user_name|login_date|
    +----------------+----------+
    |SirChillingtonIV|2012-01-04|
    |Booooooo99900098|2012-01-04|
    |Booooooo99900098|2012-01-06|
    |  OprahWinfreyJr|2012-01-10|
    |SirChillingtonIV|2012-01-11|
    |SirChillingtonIV|2012-01-14|
    |SirChillingtonIV|2012-08-11|
    +----------------+----------+
    

    이제 우리는 처음 login_date의 차이가 5 일 이상 밖으로 찾고 싶어요.

    이를 위해 아래와 같이 할.

    필요한 수입

    from pyspark.sql import functions as f
    from pyspark.sql import Window
    
    
    # defining window partitions  
    login_window = Window.partitionBy("user_name").orderBy("login_date")
    session_window = Window.partitionBy("user_name", "session")
    
    session_df = df.withColumn("session", f.sum((f.coalesce(f.datediff("login_date", f.lag("login_date", 1).over(login_window)), f.lit(0)) > 5).cast("int")).over(login_window))
    

    date_diff 경우 우리는 위의 코드를 실행하면 NULL 다음 병합 기능은 0으로 NULL을 대체합니다.

    +----------------+----------+-------+
    |       user_name|login_date|session|
    +----------------+----------+-------+
    |  OprahWinfreyJr|2012-01-10|      0|
    |SirChillingtonIV|2012-01-04|      0|
    |SirChillingtonIV|2012-01-11|      1|
    |SirChillingtonIV|2012-01-14|      1|
    |SirChillingtonIV|2012-08-11|      2|
    |Booooooo99900098|2012-01-04|      0|
    |Booooooo99900098|2012-01-06|      0|
    +----------------+----------+-------+
    
    
    # add became_active column by finding the `min login_date` for each window partitionBy `user_name` and `session` created in above step
    final_df = session_df.withColumn("became_active", f.min("login_date").over(session_window)).drop("session")
    
    +----------------+----------+-------------+
    |       user_name|login_date|became_active|
    +----------------+----------+-------------+
    |  OprahWinfreyJr|2012-01-10|   2012-01-10|
    |SirChillingtonIV|2012-01-04|   2012-01-04|
    |SirChillingtonIV|2012-01-11|   2012-01-11|
    |SirChillingtonIV|2012-01-14|   2012-01-11|
    |SirChillingtonIV|2012-08-11|   2012-08-11|
    |Booooooo99900098|2012-01-04|   2012-01-04|
    |Booooooo99900098|2012-01-06|   2012-01-04|
    +----------------+----------+-------------+
    
  3. from https://stackoverflow.com/questions/42448564/spark-sql-window-function-with-complex-condition by cc-by-sa and MIT license