복붙노트

[SCALA] unionAll을 여러 dataframes 불꽃

SCALA

unionAll을 여러 dataframes 불꽃

dataframes의 집합에 대한

val df1 = sc.parallelize(1 to 4).map(i => (i,i*10)).toDF("id","x")
val df2 = sc.parallelize(1 to 4).map(i => (i,i*100)).toDF("id","y")
val df3 = sc.parallelize(1 to 4).map(i => (i,i*1000)).toDF("id","z")

노조에 그들 모두 내가 할

df1.unionAll(df2).unionAll(df3)

dataframes의 수에 대한이 일을 더 우아하고 확장 가능한 방법의 예를 들어 있나요

Seq(df1, df2, df3) 

해결법

  1. ==============================

    1.가장 간단한 해결책은 (스파크 <2.0 조합 전체) 조합으로 줄이는 것입니다 :

    가장 간단한 해결책은 (스파크 <2.0 조합 전체) 조합으로 줄이는 것입니다 :

    val dfs = Seq(df1, df2, df3)
    dfs.reduce(_ union _)
    

    이것은 상대적으로 간결 및 오프 - 힙 스토리지에서 데이터를 이동하지 않아야하지만, 각 조합과 혈통이 계획 분석을 수행 할 수있는 비선형 시간을 필요로 확장합니다. 당신이 DataFrames 많은 수의 병합하려고하면 무슨 문제가 될 수 있습니다.

    또한 RDDs 및 사용 SparkContext.union로 변환 할 수 있습니다 :

    dfs match {
      case h :: Nil => Some(h)
      case h :: _   => Some(h.sqlContext.createDataFrame(
                         h.sqlContext.sparkContext.union(dfs.map(_.rdd)),
                         h.schema
                       ))
      case Nil  => None
    }
    

    그것은 혈통 짧은 분석 비용을 낮게 유지하지만 그렇지 않으면 직접 DataFrames를 병합보다 효율적이다.

  2. ==============================

    2.pyspark를 들어 다음을 수행 할 수 있습니다 :

    pyspark를 들어 다음을 수행 할 수 있습니다 :

    from functools import reduce
    from pyspark.sql import DataFrame
    
    dfs = [df1,df2,df3]
    df = reduce(DataFrame.unionAll, dfs)
    

    또한 dataframes의 열 순서는 일이에 대해 동일해야 가치가 아무것도 아니다. 올바른 열 주문이없는 경우에 자동으로 예기치 않은 결과를 제공 할 수 있습니다!

    당신이 pyspark 2.3 이상을 사용하는 경우에는 열을 다시 정렬 할 필요가 없습니다, 당신은 unionByName를 사용할 수 있습니다.

  3. ==============================

    3.후드 스파크에서 조합 식을 평평. 연합 선형 완료되면 그래서 시간이 더 오래 걸립니다.

    후드 스파크에서 조합 식을 평평. 연합 선형 완료되면 그래서 시간이 더 오래 걸립니다.

    가장 좋은 방법은 여러 DataFrames을 지원하는 조합 기능을 가지고 불꽃입니다.

    그러나 다음 코드는 다소 여러 DataFrames (또는 데이터 집합)의 결합을 가속화 할 수 있습니다.

      def union[T : ClassTag](datasets : TraversableOnce[Dataset[T]]) : Dataset[T] = {
          binaryReduce[Dataset[T]](datasets, _.union(_))
      }
      def binaryReduce[T : ClassTag](ts : TraversableOnce[T], op: (T, T) => T) : T = {
          if (ts.isEmpty) {
             throw new IllegalArgumentException
          }
          var array = ts toArray
          var size = array.size
          while(size > 1) {
             val newSize = (size + 1) / 2
             for (i <- 0 until newSize) {
                 val index = i*2
                 val index2 = index + 1
                 if (index2 >= size) {
                    array(i) = array(index)  // last remaining
                 } else {
                    array(i) = op(array(index), array(index2))
                 }
             }
             size = newSize
         }
         array(0)
     }
    
  4. from https://stackoverflow.com/questions/37612622/spark-unionall-multiple-dataframes by cc-by-sa and MIT license