[SCALA] 스파크 및 SparkSQL : 어떻게 모방 윈도우 함수에?
SCALA스파크 및 SparkSQL : 어떻게 모방 윈도우 함수에?
dataframe 안양 감안할 때
id | date
---------------
1 | 2015-09-01
2 | 2015-09-01
1 | 2015-09-03
1 | 2015-09-04
2 | 2015-09-04
나는 실행 카운터 또는 인덱스를 만들려면,
그러므로
id | date | counter
--------------------------
1 | 2015-09-01 | 1
1 | 2015-09-03 | 2
1 | 2015-09-04 | 3
2 | 2015-09-01 | 1
2 | 2015-09-04 | 2
이는 윈도우 기능, 예를 들어,와 내가 달성 할 수있는 무언가이다
val w = Window.partitionBy("id").orderBy("date")
val resultDF = df.select( df("id"), rowNumber().over(w) )
불행하게도, 스파크 1.4.1 일반 dataframes에 대한 윈도우 기능을 지원하지 않습니다
org.apache.spark.sql.AnalysisException: Could not resolve window function 'row_number'. Note that, using window functions currently requires a HiveContext;
감사!
해결법
-
==============================
1.당신은 RDDs이 작업을 수행 할 수 있습니다. RDDs이 더 많은 의미가 있습니다 위해 개인적으로 나는 API를 찾을 수 - 난 항상 내 데이터가 dataframe 같은 '평면'되고 싶지 않아.
당신은 RDDs이 작업을 수행 할 수 있습니다. RDDs이 더 많은 의미가 있습니다 위해 개인적으로 나는 API를 찾을 수 - 난 항상 내 데이터가 dataframe 같은 '평면'되고 싶지 않아.
val df = sqlContext.sql("select 1, '2015-09-01'" ).unionAll(sqlContext.sql("select 2, '2015-09-01'") ).unionAll(sqlContext.sql("select 1, '2015-09-03'") ).unionAll(sqlContext.sql("select 1, '2015-09-04'") ).unionAll(sqlContext.sql("select 2, '2015-09-04'")) // dataframe as an RDD (of Row objects) df.rdd // grouping by the first column of the row .groupBy(r => r(0)) // map each group - an Iterable[Row] - to a list and sort by the second column .map(g => g._2.toList.sortBy(row => row(1).toString)) .collect()
위는 다음과 같은 결과를 제공합니다 :
Array[List[org.apache.spark.sql.Row]] = Array( List([1,2015-09-01], [1,2015-09-03], [1,2015-09-04]), List([2,2015-09-01], [2,2015-09-04]))
당신이뿐만 아니라 '그룹'내 위치를 원한다면, 당신은 zipWithIndex를 사용할 수 있습니다.
df.rdd.groupBy(r => r(0)).map(g => g._2.toList.sortBy(row => row(1).toString).zipWithIndex).collect() Array[List[(org.apache.spark.sql.Row, Int)]] = Array( List(([1,2015-09-01],0), ([1,2015-09-03],1), ([1,2015-09-04],2)), List(([2,2015-09-01],0), ([2,2015-09-04],1)))
당신은 FlatMap를 사용하여 객체 행의 간단한 목록 / 배열이 다시 평평하게 할 수도 있지만 당신은 좋은 생각되지 않을 것 '그룹'에 무엇을 수행해야합니다.
이 같은 RDD를 사용하는 단점은 DataFrame에서 RDD하고 다시 변환하는 지루한 점이다.
-
==============================
2.당신이하지 아주 좋은 이유가없는 당신은뿐만 아니라 지역 DataFrames에 대한 HiveContext을 사용할 수 있습니다, 아마 어쨌든 좋은 아이디어이다. 그것은 (지금으로 sparkR 일반는 SqlContext를 사용하는 것)과 그 파서는 스파크 SQL 및 DataFrame 설명서에서 권장하는 스파크 쉘 및 pyspark 쉘에서 사용할 수있는 기본는 SqlContext입니다.
당신이하지 아주 좋은 이유가없는 당신은뿐만 아니라 지역 DataFrames에 대한 HiveContext을 사용할 수 있습니다, 아마 어쨌든 좋은 아이디어이다. 그것은 (지금으로 sparkR 일반는 SqlContext를 사용하는 것)과 그 파서는 스파크 SQL 및 DataFrame 설명서에서 권장하는 스파크 쉘 및 pyspark 쉘에서 사용할 수있는 기본는 SqlContext입니다.
import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions.rowNumber object HiveContextTest { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Hive Context") val sc = new SparkContext(conf) val sqlContext = new HiveContext(sc) import sqlContext.implicits._ val df = sc.parallelize( ("foo", 1) :: ("foo", 2) :: ("bar", 1) :: ("bar", 2) :: Nil ).toDF("k", "v") val w = Window.partitionBy($"k").orderBy($"v") df.select($"k", $"v", rowNumber.over(w).alias("rn")).show } }
-
==============================
3.난 완전히 DataFrames에 대한 윈도우 함수를 사용하면 스파크 버전 (> =) 1.5이있는 경우 길을 가야하는 것에 동의합니다. 당신은 이전 버전 (예 : 1.4.1) 정말 붙어있는 경우에, 여기에이 문제를 해결하기 위해 해키 방법입니다
난 완전히 DataFrames에 대한 윈도우 함수를 사용하면 스파크 버전 (> =) 1.5이있는 경우 길을 가야하는 것에 동의합니다. 당신은 이전 버전 (예 : 1.4.1) 정말 붙어있는 경우에, 여기에이 문제를 해결하기 위해 해키 방법입니다
val df = sc.parallelize((1, "2015-09-01") :: (2, "2015-09-01") :: (1, "2015-09-03") :: (1, "2015-09-04") :: (1, "2015-09-04") :: Nil) .toDF("id", "date") val dfDuplicate = df.selecExpr("id as idDup", "date as dateDup") val dfWithCounter = df.join(dfDuplicate,$"id"===$"idDup") .where($"date"<=$"dateDup") .groupBy($"id", $"date") .agg($"id", $"date", count($"idDup").as("counter")) .select($"id",$"date",$"counter")
당신은 dfWithCounter.show을 이제 경우
당신은 얻을 것이다 :
+---+----------+-------+ | id| date|counter| +---+----------+-------+ | 1|2015-09-01| 1| | 1|2015-09-04| 3| | 1|2015-09-03| 2| | 2|2015-09-01| 1| | 2|2015-09-04| 2| +---+----------+-------+
그 날짜가 정렬되지 않고, 카운터가 정확합니다. 또한 당신은 어디 성명에서 =의 <로 =>를 변경하여 카운터의 순서를 변경할 수 있습니다.
from https://stackoverflow.com/questions/32407455/spark-and-sparksql-how-to-imitate-window-function by cc-by-sa and MIT license
'SCALA' 카테고리의 다른 글
[SCALA] 임베디드 스칼라 REPL 상속 부모 클래스 경로 (0) | 2019.11.22 |
---|---|
[SCALA] 사람이 기호가 "=>"스칼라에서 어떻게 사용되는지 설명 할 수 (0) | 2019.11.22 |
[SCALA] 스칼라 여러 Seqs의 게으른 직교 제품 (0) | 2019.11.22 |
[SCALA] toSet를 호출하여 매개 변수 유형 오류를 누락? (0) | 2019.11.22 |
[SCALA] 스칼라의 미래와 약속 취소 (0) | 2019.11.22 |