복붙노트

[SQL] 어떻게 스파크 RDD에 대한 SQL ROW_NUMBER 동등한를받을 수 있나요?

SQL

어떻게 스파크 RDD에 대한 SQL ROW_NUMBER 동등한를받을 수 있나요?

나는 많은 열이있는 데이터 테이블 row_numbers의 전체 목록을 생성해야합니다.

SQL에서, 이것은 다음과 같이 보일 것이다 :

select
   key_value,
   col1,
   col2,
   col3,
   row_number() over (partition by key_value order by col1, col2 desc, col3)
from
   temp
;

자,하자 내가 내 항목처럼 V = (col1, col2, col3)로, 그래서 양식 (K, V)의 RDD이 불꽃에 말

(key1, (1,2,3))
(key1, (1,4,7))
(key1, (2,2,3))
(key2, (5,5,5))
(key2, (5,5,9))
(key2, (7,5,5))
etc.

나는 등 sortBy (), sortWith (), sortByKey (), zipWithIndex, 등이 사용하여 명령을 주문하고 올바른 ROW_NUMBER와 새로운 RDD을 갖고 싶어

(key1, (1,2,3), 2)
(key1, (1,4,7), 1)
(key1, (2,2,3), 3)
(key2, (5,5,5), 1)
(key2, (5,5,9), 2)
(key2, (7,5,5), 3)
etc.

(양식도 (K, (COL1, COL2, COL3, ROWNUM을) 할 수 있도록 내가 대신), 괄호에 대해 걱정하지 않는다)

이걸 어떻게해야합니까?

여기 내 첫 번째 시도는 다음과 같습니다

val sample_data = Seq(((3,4),5,5,5),((3,4),5,5,9),((3,4),7,5,5),((1,2),1,2,3),((1,2),1,4,7),((1,2),2,2,3))

val temp1 = sc.parallelize(sample_data)

temp1.collect().foreach(println)

// ((3,4),5,5,5)
// ((3,4),5,5,9)
// ((3,4),7,5,5)
// ((1,2),1,2,3)
// ((1,2),1,4,7)
// ((1,2),2,2,3)

temp1.map(x => (x, 1)).sortByKey().zipWithIndex.collect().foreach(println)

// ((((1,2),1,2,3),1),0)
// ((((1,2),1,4,7),1),1)
// ((((1,2),2,2,3),1),2)
// ((((3,4),5,5,5),1),3)
// ((((3,4),5,5,9),1),4)
// ((((3,4),7,5,5),1),5)

// note that this isn't ordering with a partition on key value K!

val temp2 = temp1.???

또한 함수는 sortBy RDD에 직접 적용 할 수는 없으므로, 그러나 하나의 제) (수집 실행해야하고, 출력은 RDD없고, 어느하지만 배열

temp1.collect().sortBy(a => a._2 -> -a._3 -> a._4).foreach(println)

// ((1,2),1,4,7)
// ((1,2),1,2,3)
// ((1,2),2,2,3)
// ((3,4),5,5,5)
// ((3,4),5,5,9)
// ((3,4),7,5,5)

여기에 좀 더 진행하지만 여전히 분할되지 :

val temp2 = sc.parallelize(temp1.map(a => (a._1,(a._2, a._3, a._4))).collect().sortBy(a => a._2._1 -> -a._2._2 -> a._2._3)).zipWithIndex.map(a => (a._1._1, a._1._2._1, a._1._2._2, a._1._2._3, a._2 + 1))

temp2.collect().foreach(println)

// ((1,2),1,4,7,1)
// ((1,2),1,2,3,2)
// ((1,2),2,2,3,3)
// ((3,4),5,5,5,4)
// ((3,4),5,5,9,5)
// ((3,4),7,5,5,6)

해결법

  1. ==============================

    1.ROW_NUMBER () 기능 이상 (기준 ... 의해 순서 파티션) 1.4을 촉발 하였다. 이 답변 PySpark / DataFrames를 사용합니다.

    ROW_NUMBER () 기능 이상 (기준 ... 의해 순서 파티션) 1.4을 촉발 하였다. 이 답변 PySpark / DataFrames를 사용합니다.

    테스트 DataFrame 만들기 :

    from pyspark.sql import Row, functions as F
    
    testDF = sc.parallelize(
        (Row(k="key1", v=(1,2,3)),
         Row(k="key1", v=(1,4,7)),
         Row(k="key1", v=(2,2,3)),
         Row(k="key2", v=(5,5,5)),
         Row(k="key2", v=(5,5,9)),
         Row(k="key2", v=(7,5,5))
        )
    ).toDF()
    

    분할 된 행 번호를 추가 :

    from pyspark.sql.window import Window
    
    (testDF
     .select("k", "v",
             F.rowNumber()
             .over(Window
                   .partitionBy("k")
                   .orderBy("k")
                  )
             .alias("rowNum")
            )
     .show()
    )
    
    +----+-------+------+
    |   k|      v|rowNum|
    +----+-------+------+
    |key1|[1,2,3]|     1|
    |key1|[1,4,7]|     2|
    |key1|[2,2,3]|     3|
    |key2|[5,5,5]|     1|
    |key2|[5,5,9]|     2|
    |key2|[7,5,5]|     3|
    +----+-------+------+
    
  2. ==============================

    2.이것은 당신이 양육하고 흥미로운 문제입니다. 파이썬에 대답하지만 난 당신이 스칼라로 완벽하게 번역 할 수있을 것입니다 확신합니다.

    이것은 당신이 양육하고 흥미로운 문제입니다. 파이썬에 대답하지만 난 당신이 스칼라로 완벽하게 번역 할 수있을 것입니다 확신합니다.

    여기에 내가 그것을 해결 할 방법은 다음과 같습니다

    1 데이터를 단순화 :

    temp2 = temp1.map(lambda x: (x[0],(x[1],x[2],x[3])))
    

    TEMP2는 이제 "진짜"키 - 값 쌍입니다. 그것은 그 다음과 같습니다

    [
    ((3, 4), (5, 5, 5)),  
    ((3, 4), (5, 5, 9)),   
    ((3, 4), (7, 5, 5)),   
    ((1, 2), (1, 2, 3)),  
    ((1, 2), (1, 4, 7)),   
    ((1, 2), (2, 2, 3))
    

    ]

    2- 그리고, PARTITION BY 효과를 재현하는 함수 군 - 더를 사용

    temp3 = temp2.groupByKey()
    

    2 행과 RDD 지금 TEMP3입니다 :

    [((1, 2), <pyspark.resultiterable.ResultIterable object at 0x15e08d0>),  
     ((3, 4), <pyspark.resultiterable.ResultIterable object at 0x15e0290>)]
    

    3 이제, 당신은 RDD의 각 값에 대한 순위 기능을 적용해야합니다. 파이썬에서, 나는 간단한 정렬 기능을 사용합니다 (열거는 ROW_NUMBER 열을 생성합니다) :

     temp4 = temp3.flatMap(lambda x: tuple([(x[0],(i[1],i[0])) for i in enumerate(sorted(x[1]))])).take(10)
    

    특정 순서를 구현하기 위해, 당신은 파이썬에서 (오른쪽 "키"인수를 공급해야하는 것으로, 난 그냥 그와 같은 람다 함수를 만들 것입니다 :

    lambda tuple : (tuple[0],-tuple[1],tuple[2])
    

    마지막에 (키 인수 기능이없는, 그 모양) :

    [
    ((1, 2), ((1, 2, 3), 0)), 
    ((1, 2), ((1, 4, 7), 1)), 
    ((1, 2), ((2, 2, 3), 2)), 
    ((3, 4), ((5, 5, 5), 0)), 
    ((3, 4), ((5, 5, 9), 1)), 
    ((3, 4), ((7, 5, 5), 2))
    

    ]

    희망이 도움이!

    행운을 빕니다.

  3. ==============================

    3.

    val test = Seq(("key1", (1,2,3)),("key1",(4,5,6)), ("key2", (7,8,9)), ("key2", (0,1,2)))
    

    시험 서열 [(문자열 (INT, INT, INT))] = 목록 ((KEY1, (1,2,3)) (KEY1 (4,5,6)) (키 2, (7,8 9)) (키 2, (0,1,2)))

    test.foreach(println)
    

    (KEY1, (1,2,3))

    (KEY1 (4,5,6))

    (키 2, (7,8,9))

    (키 2, (0,1,2))

    val rdd = sc.parallelize(test, 2)
    

    EET : org.apache.spark.rdd.RDD [(문자열 (INT, INT, INT))] = ParallelCollectionRDD [41]로 병렬화 : 26

    val rdd1 = rdd.groupByKey.map(x => (x._1,x._2.toArray)).map(x => (x._1, x._2.sortBy(x => x._1).zipWithIndex))
    

    rdd1 : org.apache.spark.rdd.RDD [(문자열 배열 [((INT, INT, INT), INT)])] = MapPartitionsRDD [44]에 매핑한다 : 25

    val rdd2 = rdd1.flatMap{ 
      elem =>
       val key = elem._1
       elem._2.map(row => (key, row._1, row._2))
     }
    

    rdd2 : org.apache.spark.rdd.RDD [(문자열 (INT, INT, INT), INT) = MapPartitionsRDD [45] flatMap 행에 25

    rdd2.collect.foreach(println)
    

    (KEY1, (1,2,3), 0)

    (KEY1, (4,5,6), 1)

    (키 2, (0,1,2), 0)

    (키 2, (7,8,9), 1)

  4. ==============================

    4.스파크 SQL에서 데이터 파일을 읽기 ... 발 DF = spark.read.json ( "S3 : // s3bukcet / 키 / 활동 / 년 = 2018 / 월 = 12 / 날짜 = 15 / *");

    스파크 SQL에서 데이터 파일을 읽기 ... 발 DF = spark.read.json ( "S3 : // s3bukcet / 키 / 활동 / 년 = 2018 / 월 = 12 / 날짜 = 15 / *");

    위의 파일 필드는 페이지 뷰와 클릭 수를 USER_ID있다

    클릭에 의해 USER_ID 순서에 의해 구획 활성 이드 (ROW_NUMBER)를 생성 발 출력 = df.withColumn ( "activity_id"functions.row_number () 이상 (Window.partitionBy ( "USER_ID") 해 orderBy ( "클릭")) 캐스트 (DataTypes.IntegerType)...);

  5. from https://stackoverflow.com/questions/27050247/how-do-i-get-a-sql-row-number-equivalent-for-a-spark-rdd by cc-by-sa and MIT license