복붙노트

[SCALA] 자동으로 우아하게 스파크 SQL에 DataFrame 평평

SCALA

자동으로 우아하게 스파크 SQL에 DataFrame 평평

모두,

중첩 된 StructType의있는 열이있는 스파크 SQL 테이블 (마루)을 평평 우아하고 허용 방법이 있나요

예를 들면

내 스키마는 경우 :

foo
 |_bar
 |_baz
x
y
z

나는에 선택 어떻게 수동으로 실행에 의존하지 않고 테이블 형식을 평평

df.select("foo.bar","foo.baz","x","y","z")

즉, 내가 어떻게 프로그래밍 StructType과 DataFrame 그냥 주어진 위의 코드의 결과를 얻을 수 있습니까

해결법

  1. ==============================

    1.짧은 대답이이 작업을 수행 할 "승인"방법은 없다, 그러나 당신은 DataFrame.schema을 걷는하여 선택 (...) 문을 생성하는 재귀 함수와 매우 우아 그것을 할 수있다.

    짧은 대답이이 작업을 수행 할 "승인"방법은 없다, 그러나 당신은 DataFrame.schema을 걷는하여 선택 (...) 문을 생성하는 재귀 함수와 매우 우아 그것을 할 수있다.

    재귀 함수는 배열 [열]을 반환한다. 함수가 StructType 안타 때마다, 그 자체를 호출하고 자신의 배열 [열]에 반환 된 배열 [열]을 추가 할 것입니다.

    뭔가 같은 :

    def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = {
      schema.fields.flatMap(f => {
        val colName = if (prefix == null) f.name else (prefix + "." + f.name)
    
        f.dataType match {
          case st: StructType => flattenSchema(st, colName)
          case _ => Array(col(colName))
        }
      })
    }
    

    당신은 다음과 같이 사용합니다 :

    df.select(flattenSchema(df.schema):_*)
    
  2. ==============================

    2.나는 내 이전 대답을 개선하고 허용 대답의 의견에 명시된 내 자신의 문제에 대한 솔루션을 제공하고있다.

    나는 내 이전 대답을 개선하고 허용 대답의 의견에 명시된 내 자신의 문제에 대한 솔루션을 제공하고있다.

    이 허용 솔루션은 열 객체의 배열을 만든 다음 열을 선택하는 데 사용합니다. 중첩 된 DataFrame이있는 경우 스파크에서,이 같은 자식 열을 선택할 수 있습니다 df.select ( "Parent.Child")와 자식 컬럼의 값이 리턴한다 DataFrame을하고 아이를 지정됩니다. 당신이 다른 부모 구조의 특성에 대한 동일한 이름이있는 경우, 당신은 부모에 대한 정보를 잃고 동일한 열 이름으로 끝낼 수 있으며, 그들이 모호으로 더 이상 그들의 이름에 액세스 할 수 없습니다.

    이것은 내 문제였다.

    나는 어쩌면 다른뿐만 아니라 다른 사람에게 도움이 될 수 있습니다, 내 문제에 대한 해결책을 발견했다. 나는 별도로 flattenSchema라고 :

    val flattenedSchema = flattenSchema(df.schema)
    

    이것은 열 객체의 배열을 돌려 보냈다. 대신 마지막 수준의 아이가라는 이름의 열이있는 DataFrame을 반환하는 () 선택이를 사용하여, 나는 다음 Parent.Child 열을 선택한 후, 그것은 부모로 이름을 변경 문자열로 자신에게 원래의 열 이름을 매핑 대신 아이의 .Child (나는 또한 나의 편의를 위해 밑줄 점을 대체)

    val renamedCols = flattenedSchema.map(name => col(name.toString()).as(name.toString().replace(".","_")))
    

    원래의 대답과 같이 그리고 당신은 선택 기능을 사용할 수 있습니다 :

    var newDf = df.select(renamedCols:_*)
    
  3. ==============================

    3.이 중첩 된 객체의 어떤 수준을 지원합니다, 그래서 그것은 더 많거나 적은 @ 데이비드 그리핀의 솔루션의 번역이다 - 그냥 Pyspark에 대한 내 솔루션을 공유하고 싶었다.

    이 중첩 된 객체의 어떤 수준을 지원합니다, 그래서 그것은 더 많거나 적은 @ 데이비드 그리핀의 솔루션의 번역이다 - 그냥 Pyspark에 대한 내 솔루션을 공유하고 싶었다.

    from pyspark.sql.types import StructType, ArrayType  
    
    def flatten(schema, prefix=None):
        fields = []
        for field in schema.fields:
            name = prefix + '.' + field.name if prefix else field.name
            dtype = field.dataType
            if isinstance(dtype, ArrayType):
                dtype = dtype.elementType
    
            if isinstance(dtype, StructType):
                fields += flatten(dtype, prefix=name)
            else:
                fields.append(name)
    
        return fields
    
    
    df.select(flatten(df.schema)).show()
    
  4. ==============================

    4.========== 편집 ====

    ========== 편집 ====

    더 복잡한 스키마에 대한 몇 가지 추가 처리가 여기에있다 : https://medium.com/@lvhuyen/working-with-spark-dataframe-having-a-complex-schema-a3bce8c3f44

    ==================

    '.'당신의 필드 이름은 점과 같은 특수 문자를 @Evan V의 대답에 추가 PySpark, 하이픈 '-', ... :

    from pyspark.sql.types import StructType, ArrayType  
    
    def normalise_field(raw):
        return raw.strip().lower() \
                .replace('`', '') \
                .replace('-', '_') \
                .replace(' ', '_') \
                .strip('_')
    
    def flatten(schema, prefix=None):
        fields = []
        for field in schema.fields:
            name = "%s.`%s`" % (prefix, field.name) if prefix else "`%s`" % field.name
            dtype = field.dataType
            if isinstance(dtype, ArrayType):
                dtype = dtype.elementType
            if isinstance(dtype, StructType):
                fields += flatten(dtype, prefix=name)
            else:
                fields.append(col(name).alias(normalise_field(name)))
    
        return fields
    
    df.select(flatten(df.schema)).show()
    
  5. ==============================

    5.또한 평면으로 열을 선택하는 SQL을 사용할 수 있습니다.

    또한 평면으로 열을 선택하는 SQL을 사용할 수 있습니다.

    나는 자바에서 구현을했다 : https://gist.github.com/ebuildy/3de0e2855498e5358e4eed1a4f72ea48

    (물론, 내가 SQL 방식을 선호 당신이 불꽃 쉘을 통해 쉽게 테스트 할 수 있도록, 재귀 적 방법을 사용).

  6. ==============================

    6.여기 당신이 원하는 그 접두사로, 같은 이름의 열을 포함하는 여러 중첩 된 열을 해결할 수있는 일을하는 기능은 다음과 같습니다

    여기 당신이 원하는 그 접두사로, 같은 이름의 열을 포함하는 여러 중첩 된 열을 해결할 수있는 일을하는 기능은 다음과 같습니다

    from pyspark.sql import functions as F
    
    def flatten_df(nested_df):
        flat_cols = [c[0] for c in nested_df.dtypes if c[1][:6] != 'struct']
        nested_cols = [c[0] for c in nested_df.dtypes if c[1][:6] == 'struct']
    
        flat_df = nested_df.select(flat_cols +
                                   [F.col(nc+'.'+c).alias(nc+'_'+c)
                                    for nc in nested_cols
                                    for c in nested_df.select(nc+'.*').columns])
        return flat_df
    

    전에:

    root
     |-- x: string (nullable = true)
     |-- y: string (nullable = true)
     |-- foo: struct (nullable = true)
     |    |-- a: float (nullable = true)
     |    |-- b: float (nullable = true)
     |    |-- c: integer (nullable = true)
     |-- bar: struct (nullable = true)
     |    |-- a: float (nullable = true)
     |    |-- b: float (nullable = true)
     |    |-- c: integer (nullable = true)
    

    후:

    root
     |-- x: string (nullable = true)
     |-- y: string (nullable = true)
     |-- foo_a: float (nullable = true)
     |-- foo_b: float (nullable = true)
     |-- foo_c: integer (nullable = true)
     |-- bar_a: float (nullable = true)
     |-- bar_b: float (nullable = true)
     |-- bar_c: integer (nullable = true)
    
  7. ==============================

    7.나는 오픈 소스 스파크 다리아 프로젝트에 DataFrame # flattenSchema 방법을 추가했습니다.

    나는 오픈 소스 스파크 다리아 프로젝트에 DataFrame # flattenSchema 방법을 추가했습니다.

    다음은 코드 기능을 사용할 수있는 방법입니다.

    import com.github.mrpowers.spark.daria.sql.DataFrameExt._
    df.flattenSchema().show()
    
    +-------+-------+---------+----+---+
    |foo.bar|foo.baz|        x|   y|  z|
    +-------+-------+---------+----+---+
    |   this|     is|something|cool| ;)|
    +-------+-------+---------+----+---+
    

    또한 flattenSchema () 메소드를 다른 열 이름 구분 기호를 지정할 수 있습니다.

    df.flattenSchema(delimiter = "_").show()
    +-------+-------+---------+----+---+
    |foo_bar|foo_baz|        x|   y|  z|
    +-------+-------+---------+----+---+
    |   this|     is|something|cool| ;)|
    +-------+-------+---------+----+---+
    

    이 구분 매개 변수는 놀라 울 정도로 중요하다. 당신이 Redshift에있는 테이블을로드 할 스키마를 병합하는 경우, 당신은 구분 기호로 기간을 사용할 수 없습니다.

    다음은이 출력을 생성하는 전체 코드입니다.

    val data = Seq(
      Row(Row("this", "is"), "something", "cool", ";)")
    )
    
    val schema = StructType(
      Seq(
        StructField(
          "foo",
          StructType(
            Seq(
              StructField("bar", StringType, true),
              StructField("baz", StringType, true)
            )
          ),
          true
        ),
        StructField("x", StringType, true),
        StructField("y", StringType, true),
        StructField("z", StringType, true)
      )
    )
    
    val df = spark.createDataFrame(
      spark.sparkContext.parallelize(data),
      StructType(schema)
    )
    
    df.flattenSchema().show()
    

    기본 코드는 (경우에 당신이 당신의 프로젝트에 스파크 다리아 종속성을 추가하지 않으려는) 데이비드 그리핀의 코드와 유사하다.

    object StructTypeHelpers {
    
      def flattenSchema(schema: StructType, delimiter: String = ".", prefix: String = null): Array[Column] = {
        schema.fields.flatMap(structField => {
          val codeColName = if (prefix == null) structField.name else prefix + "." + structField.name
          val colName = if (prefix == null) structField.name else prefix + delimiter + structField.name
    
          structField.dataType match {
            case st: StructType => flattenSchema(schema = st, delimiter = delimiter, prefix = colName)
            case _ => Array(col(codeColName).alias(colName))
          }
        })
      }
    
    }
    
    object DataFrameExt {
    
      implicit class DataFrameMethods(df: DataFrame) {
    
        def flattenSchema(delimiter: String = ".", prefix: String = null): DataFrame = {
          df.select(
            StructTypeHelpers.flattenSchema(df.schema, delimiter, prefix): _*
          )
        }
    
      }
    
    }
    
  8. ==============================

    8.데이비드 그리 픈 및 V. 삼마 답변을 결합하려면, 당신은 단지 중복 된 열 이름을 피하면서 평평하게이 작업을 수행 할 수 있습니다 :

    데이비드 그리 픈 및 V. 삼마 답변을 결합하려면, 당신은 단지 중복 된 열 이름을 피하면서 평평하게이 작업을 수행 할 수 있습니다 :

    import org.apache.spark.sql.types.StructType
    import org.apache.spark.sql.Column
    import org.apache.spark.sql.DataFrame
    
    def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = {
      schema.fields.flatMap(f => {
        val colName = if (prefix == null) f.name else (prefix + "." + f.name)
        f.dataType match {
          case st: StructType => flattenSchema(st, colName)
          case _ => Array(col(colName).as(colName.replace(".","_")))
        }
      })
    }
    
    def flattenDataFrame(df:DataFrame): DataFrame = {
        df.select(flattenSchema(df.schema):_*)
    }
    
    var my_flattened_json_table = flattenDataFrame(my_json_table)
    
  9. ==============================

    9.I는 바, 바즈, X, Y, Z의 5 열 평탄화 스키마 초래 한 라이너를 사용되었습니다

    I는 바, 바즈, X, Y, Z의 5 열 평탄화 스키마 초래 한 라이너를 사용되었습니다

    df.select("foo.*", "x", "y", "z")
    

    에 관해서는 것은 폭발 : 나는 일반적으로 목록을 평평하게하기위한 폭발 보유합니다. 예를 들어 당신이 문자열의 목록입니다 열 IDLIST이있는 경우, 당신은 할 수 있습니다 :

    df.withColumn("flattenedId", functions.explode(col("idList")))
      .drop("idList")
    

    즉, flattenedId라는 이름의 열이있는 새로운 Dataframe가 발생합니다 (더 이상 목록 없음)

  10. ==============================

    10.이 솔루션의 수정이지만 tailrec 표기법을 사용

    이 솔루션의 수정이지만 tailrec 표기법을 사용

    
      @tailrec
      def flattenSchema(
          splitter: String,
          fields: List[(StructField, String)],
          acc: Seq[Column]): Seq[Column] = {
        fields match {
          case (field, prefix) :: tail if field.dataType.isInstanceOf[StructType] =>
            val newPrefix = s"$prefix${field.name}."
            val newFields = field.dataType.asInstanceOf[StructType].fields.map((_, newPrefix)).toList
            flattenSchema(splitter, tail ++ newFields, acc)
    
          case (field, prefix) :: tail =>
            val colName = s"$prefix${field.name}"
            val newCol  = col(colName).as(colName.replace(".", splitter))
            flattenSchema(splitter, tail, acc :+ newCol)
    
          case _ => acc
        }
      }
      def flattenDataFrame(df: DataFrame): DataFrame = {
        val fields = df.schema.fields.map((_, ""))
        df.select(flattenSchema("__", fields.toList, Seq.empty): _*)
      }
    
  11. ==============================

    11.당신이 중첩 된 구조체와 배열 작업하는 경우, 위의 코드에 약간의 추가.

    당신이 중첩 된 구조체와 배열 작업하는 경우, 위의 코드에 약간의 추가.

    def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = {
        schema.fields.flatMap(f => {
          val colName = if (prefix == null) f.name else (prefix + "." + f.name)
    
          f match {
            case StructField(_, struct:StructType, _, _) => flattenSchema(struct, colName)
            case StructField(_, ArrayType(x :StructType, _), _, _) => flattenSchema(x, colName)
            case StructField(_, ArrayType(_, _), _, _) => Array(col(colName))
            case _ => Array(col(colName))
          }
        })
      }
    
    
  12. ==============================

    12.

    import org.apache.spark.sql.SparkSession
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.types.StructType
    import scala.collection.mutable.ListBuffer 
    val columns=new ListBuffer[String]()
    
    def flattenSchema(schema:StructType,prefix:String=null){
    for(i<-schema.fields){
      if(i.dataType.isInstanceOf[StructType]) {
        val columnPrefix = i.name + "."
        flattenSchema(i.dataType.asInstanceOf[StructType], columnPrefix)
      }
      else {
        if(prefix == null)
          columns.+=(i.name)
        else
          columns.+=(prefix+i.name)
      }
      }
    }
    
  13. from https://stackoverflow.com/questions/37471346/automatically-and-elegantly-flatten-dataframe-in-spark-sql by cc-by-sa and MIT license