[SQL] 어떻게 스파크 RDD에 대한 SQL ROW_NUMBER 동등한를받을 수 있나요?
SQL어떻게 스파크 RDD에 대한 SQL ROW_NUMBER 동등한를받을 수 있나요?
나는 많은 열이있는 데이터 테이블 row_numbers의 전체 목록을 생성해야합니다.
SQL에서, 이것은 다음과 같이 보일 것이다 :
select
key_value,
col1,
col2,
col3,
row_number() over (partition by key_value order by col1, col2 desc, col3)
from
temp
;
자,하자 내가 내 항목처럼 V = (col1, col2, col3)로, 그래서 양식 (K, V)의 RDD이 불꽃에 말
(key1, (1,2,3))
(key1, (1,4,7))
(key1, (2,2,3))
(key2, (5,5,5))
(key2, (5,5,9))
(key2, (7,5,5))
etc.
나는 등 sortBy (), sortWith (), sortByKey (), zipWithIndex, 등이 사용하여 명령을 주문하고 올바른 ROW_NUMBER와 새로운 RDD을 갖고 싶어
(key1, (1,2,3), 2)
(key1, (1,4,7), 1)
(key1, (2,2,3), 3)
(key2, (5,5,5), 1)
(key2, (5,5,9), 2)
(key2, (7,5,5), 3)
etc.
(양식도 (K, (COL1, COL2, COL3, ROWNUM을) 할 수 있도록 내가 대신), 괄호에 대해 걱정하지 않는다)
이걸 어떻게해야합니까?
여기 내 첫 번째 시도는 다음과 같습니다
val sample_data = Seq(((3,4),5,5,5),((3,4),5,5,9),((3,4),7,5,5),((1,2),1,2,3),((1,2),1,4,7),((1,2),2,2,3))
val temp1 = sc.parallelize(sample_data)
temp1.collect().foreach(println)
// ((3,4),5,5,5)
// ((3,4),5,5,9)
// ((3,4),7,5,5)
// ((1,2),1,2,3)
// ((1,2),1,4,7)
// ((1,2),2,2,3)
temp1.map(x => (x, 1)).sortByKey().zipWithIndex.collect().foreach(println)
// ((((1,2),1,2,3),1),0)
// ((((1,2),1,4,7),1),1)
// ((((1,2),2,2,3),1),2)
// ((((3,4),5,5,5),1),3)
// ((((3,4),5,5,9),1),4)
// ((((3,4),7,5,5),1),5)
// note that this isn't ordering with a partition on key value K!
val temp2 = temp1.???
또한 함수는 sortBy RDD에 직접 적용 할 수는 없으므로, 그러나 하나의 제) (수집 실행해야하고, 출력은 RDD없고, 어느하지만 배열
temp1.collect().sortBy(a => a._2 -> -a._3 -> a._4).foreach(println)
// ((1,2),1,4,7)
// ((1,2),1,2,3)
// ((1,2),2,2,3)
// ((3,4),5,5,5)
// ((3,4),5,5,9)
// ((3,4),7,5,5)
여기에 좀 더 진행하지만 여전히 분할되지 :
val temp2 = sc.parallelize(temp1.map(a => (a._1,(a._2, a._3, a._4))).collect().sortBy(a => a._2._1 -> -a._2._2 -> a._2._3)).zipWithIndex.map(a => (a._1._1, a._1._2._1, a._1._2._2, a._1._2._3, a._2 + 1))
temp2.collect().foreach(println)
// ((1,2),1,4,7,1)
// ((1,2),1,2,3,2)
// ((1,2),2,2,3,3)
// ((3,4),5,5,5,4)
// ((3,4),5,5,9,5)
// ((3,4),7,5,5,6)
해결법
-
==============================
1.ROW_NUMBER () 기능 이상 (기준 ... 의해 순서 파티션) 1.4을 촉발 하였다. 이 답변 PySpark / DataFrames를 사용합니다.
ROW_NUMBER () 기능 이상 (기준 ... 의해 순서 파티션) 1.4을 촉발 하였다. 이 답변 PySpark / DataFrames를 사용합니다.
테스트 DataFrame 만들기 :
from pyspark.sql import Row, functions as F testDF = sc.parallelize( (Row(k="key1", v=(1,2,3)), Row(k="key1", v=(1,4,7)), Row(k="key1", v=(2,2,3)), Row(k="key2", v=(5,5,5)), Row(k="key2", v=(5,5,9)), Row(k="key2", v=(7,5,5)) ) ).toDF()
분할 된 행 번호를 추가 :
from pyspark.sql.window import Window (testDF .select("k", "v", F.rowNumber() .over(Window .partitionBy("k") .orderBy("k") ) .alias("rowNum") ) .show() ) +----+-------+------+ | k| v|rowNum| +----+-------+------+ |key1|[1,2,3]| 1| |key1|[1,4,7]| 2| |key1|[2,2,3]| 3| |key2|[5,5,5]| 1| |key2|[5,5,9]| 2| |key2|[7,5,5]| 3| +----+-------+------+
-
==============================
2.이것은 당신이 양육하고 흥미로운 문제입니다. 파이썬에 대답하지만 난 당신이 스칼라로 완벽하게 번역 할 수있을 것입니다 확신합니다.
이것은 당신이 양육하고 흥미로운 문제입니다. 파이썬에 대답하지만 난 당신이 스칼라로 완벽하게 번역 할 수있을 것입니다 확신합니다.
여기에 내가 그것을 해결 할 방법은 다음과 같습니다
1 데이터를 단순화 :
temp2 = temp1.map(lambda x: (x[0],(x[1],x[2],x[3])))
TEMP2는 이제 "진짜"키 - 값 쌍입니다. 그것은 그 다음과 같습니다
[ ((3, 4), (5, 5, 5)), ((3, 4), (5, 5, 9)), ((3, 4), (7, 5, 5)), ((1, 2), (1, 2, 3)), ((1, 2), (1, 4, 7)), ((1, 2), (2, 2, 3))
]
2- 그리고, PARTITION BY 효과를 재현하는 함수 군 - 더를 사용
temp3 = temp2.groupByKey()
2 행과 RDD 지금 TEMP3입니다 :
[((1, 2), <pyspark.resultiterable.ResultIterable object at 0x15e08d0>), ((3, 4), <pyspark.resultiterable.ResultIterable object at 0x15e0290>)]
3 이제, 당신은 RDD의 각 값에 대한 순위 기능을 적용해야합니다. 파이썬에서, 나는 간단한 정렬 기능을 사용합니다 (열거는 ROW_NUMBER 열을 생성합니다) :
temp4 = temp3.flatMap(lambda x: tuple([(x[0],(i[1],i[0])) for i in enumerate(sorted(x[1]))])).take(10)
특정 순서를 구현하기 위해, 당신은 파이썬에서 (오른쪽 "키"인수를 공급해야하는 것으로, 난 그냥 그와 같은 람다 함수를 만들 것입니다 :
lambda tuple : (tuple[0],-tuple[1],tuple[2])
마지막에 (키 인수 기능이없는, 그 모양) :
[ ((1, 2), ((1, 2, 3), 0)), ((1, 2), ((1, 4, 7), 1)), ((1, 2), ((2, 2, 3), 2)), ((3, 4), ((5, 5, 5), 0)), ((3, 4), ((5, 5, 9), 1)), ((3, 4), ((7, 5, 5), 2))
]
희망이 도움이!
행운을 빕니다.
-
==============================
3.
val test = Seq(("key1", (1,2,3)),("key1",(4,5,6)), ("key2", (7,8,9)), ("key2", (0,1,2)))
시험 서열 [(문자열 (INT, INT, INT))] = 목록 ((KEY1, (1,2,3)) (KEY1 (4,5,6)) (키 2, (7,8 9)) (키 2, (0,1,2)))
test.foreach(println)
(KEY1, (1,2,3))
(KEY1 (4,5,6))
(키 2, (7,8,9))
(키 2, (0,1,2))
val rdd = sc.parallelize(test, 2)
EET : org.apache.spark.rdd.RDD [(문자열 (INT, INT, INT))] = ParallelCollectionRDD [41]로 병렬화 : 26
val rdd1 = rdd.groupByKey.map(x => (x._1,x._2.toArray)).map(x => (x._1, x._2.sortBy(x => x._1).zipWithIndex))
rdd1 : org.apache.spark.rdd.RDD [(문자열 배열 [((INT, INT, INT), INT)])] = MapPartitionsRDD [44]에 매핑한다 : 25
val rdd2 = rdd1.flatMap{ elem => val key = elem._1 elem._2.map(row => (key, row._1, row._2)) }
rdd2 : org.apache.spark.rdd.RDD [(문자열 (INT, INT, INT), INT) = MapPartitionsRDD [45] flatMap 행에 25
rdd2.collect.foreach(println)
(KEY1, (1,2,3), 0)
(KEY1, (4,5,6), 1)
(키 2, (0,1,2), 0)
(키 2, (7,8,9), 1)
-
==============================
4.스파크 SQL에서 데이터 파일을 읽기 ... 발 DF = spark.read.json ( "S3 : // s3bukcet / 키 / 활동 / 년 = 2018 / 월 = 12 / 날짜 = 15 / *");
스파크 SQL에서 데이터 파일을 읽기 ... 발 DF = spark.read.json ( "S3 : // s3bukcet / 키 / 활동 / 년 = 2018 / 월 = 12 / 날짜 = 15 / *");
위의 파일 필드는 페이지 뷰와 클릭 수를 USER_ID있다
클릭에 의해 USER_ID 순서에 의해 구획 활성 이드 (ROW_NUMBER)를 생성 발 출력 = df.withColumn ( "activity_id"functions.row_number () 이상 (Window.partitionBy ( "USER_ID") 해 orderBy ( "클릭")) 캐스트 (DataTypes.IntegerType)...);
from https://stackoverflow.com/questions/27050247/how-do-i-get-a-sql-row-number-equivalent-for-a-spark-rdd by cc-by-sa and MIT license
'SQL' 카테고리의 다른 글
[SQL] (전치?)를 폭발 스파크 SQL 테이블에서 여러 열을 (0) | 2020.04.20 |
---|---|
[SQL] SQL : 변수에 따라 선택 동적 열 이름 (0) | 2020.04.20 |
[SQL] "최고의 경기"에 의해 MySQL의 순서 (0) | 2020.04.20 |
[SQL] SQL 서버 : 나는 SYS 테이블을 통해 INFORMATION_SCHEMA 테이블을 사용해야합니까? (0) | 2020.04.20 |
[SQL] 오라클은 : 큰 XML 파일을로드? (0) | 2020.04.20 |