[SCALA] unionAll을 여러 dataframes 불꽃
SCALAunionAll을 여러 dataframes 불꽃
dataframes의 집합에 대한
val df1 = sc.parallelize(1 to 4).map(i => (i,i*10)).toDF("id","x")
val df2 = sc.parallelize(1 to 4).map(i => (i,i*100)).toDF("id","y")
val df3 = sc.parallelize(1 to 4).map(i => (i,i*1000)).toDF("id","z")
노조에 그들 모두 내가 할
df1.unionAll(df2).unionAll(df3)
dataframes의 수에 대한이 일을 더 우아하고 확장 가능한 방법의 예를 들어 있나요
Seq(df1, df2, df3)
해결법
-
==============================
1.가장 간단한 해결책은 (스파크 <2.0 조합 전체) 조합으로 줄이는 것입니다 :
가장 간단한 해결책은 (스파크 <2.0 조합 전체) 조합으로 줄이는 것입니다 :
val dfs = Seq(df1, df2, df3) dfs.reduce(_ union _)
이것은 상대적으로 간결 및 오프 - 힙 스토리지에서 데이터를 이동하지 않아야하지만, 각 조합과 혈통이 계획 분석을 수행 할 수있는 비선형 시간을 필요로 확장합니다. 당신이 DataFrames 많은 수의 병합하려고하면 무슨 문제가 될 수 있습니다.
또한 RDDs 및 사용 SparkContext.union로 변환 할 수 있습니다 :
dfs match { case h :: Nil => Some(h) case h :: _ => Some(h.sqlContext.createDataFrame( h.sqlContext.sparkContext.union(dfs.map(_.rdd)), h.schema )) case Nil => None }
그것은 혈통 짧은 분석 비용을 낮게 유지하지만 그렇지 않으면 직접 DataFrames를 병합보다 효율적이다.
-
==============================
2.pyspark를 들어 다음을 수행 할 수 있습니다 :
pyspark를 들어 다음을 수행 할 수 있습니다 :
from functools import reduce from pyspark.sql import DataFrame dfs = [df1,df2,df3] df = reduce(DataFrame.unionAll, dfs)
또한 dataframes의 열 순서는 일이에 대해 동일해야 가치가 아무것도 아니다. 올바른 열 주문이없는 경우에 자동으로 예기치 않은 결과를 제공 할 수 있습니다!
당신이 pyspark 2.3 이상을 사용하는 경우에는 열을 다시 정렬 할 필요가 없습니다, 당신은 unionByName를 사용할 수 있습니다.
-
==============================
3.후드 스파크에서 조합 식을 평평. 연합 선형 완료되면 그래서 시간이 더 오래 걸립니다.
후드 스파크에서 조합 식을 평평. 연합 선형 완료되면 그래서 시간이 더 오래 걸립니다.
가장 좋은 방법은 여러 DataFrames을 지원하는 조합 기능을 가지고 불꽃입니다.
그러나 다음 코드는 다소 여러 DataFrames (또는 데이터 집합)의 결합을 가속화 할 수 있습니다.
def union[T : ClassTag](datasets : TraversableOnce[Dataset[T]]) : Dataset[T] = { binaryReduce[Dataset[T]](datasets, _.union(_)) } def binaryReduce[T : ClassTag](ts : TraversableOnce[T], op: (T, T) => T) : T = { if (ts.isEmpty) { throw new IllegalArgumentException } var array = ts toArray var size = array.size while(size > 1) { val newSize = (size + 1) / 2 for (i <- 0 until newSize) { val index = i*2 val index2 = index + 1 if (index2 >= size) { array(i) = array(index) // last remaining } else { array(i) = op(array(index), array(index2)) } } size = newSize } array(0) }
from https://stackoverflow.com/questions/37612622/spark-unionall-multiple-dataframes by cc-by-sa and MIT license
'SCALA' 카테고리의 다른 글
[SCALA] int로는 null이 될 수없는 경우, null.asInstanceOf [지능]은 무엇을 의미합니까? (0) | 2019.11.07 |
---|---|
[SCALA] JSObject를 어떠한 JSON 시리얼 타입 play.api.libs.json.JsObject에 대한 발견 (0) | 2019.11.07 |
[SCALA] 튜플 매개 변수 선언과 할당 기이 (0) | 2019.11.07 |
[SCALA] 예외 : ZLIB 입력 스트림의 예기치 않은 종료 (0) | 2019.11.07 |
[SCALA] 볼품에 반복적으로 이기종 목록에 케이스 클래스를 변환하려고 이상한 행동 (0) | 2019.11.07 |