복붙노트

[SCALA] 스파크 DataFrame가 열이있는 경우 어떻게 감지 할

SCALA

스파크 DataFrame가 열이있는 경우 어떻게 감지 할

나는 스파크 SQL에서 JSON 파일에서 DataFrame을 만들 때 해당 열이 ALL 기타 사항 서보 -OFF 호출하기 전에 존재하는 경우 어떻게 알 수

예 JSON 스키마 :

{
  "a": {
    "b": 1,
    "c": 2
  }
}

이것은 내가하고 싶은 것입니다 :

potential_columns = Seq("b", "c", "d")
df = sqlContext.read.json(filename)
potential_columns.map(column => if(df.hasColumn(column)) df.select(s"a.$column"))

하지만 난 hasColumn을위한 좋은 기능을 찾을 수 없습니다. 열이 다소 어색 배열에있는 경우 내가 들어 왔 가장 가까운 테스트하는 것입니다 :

scala> df.select("a.*").columns
res17: Array[String] = Array(b, c)

해결법

  1. ==============================

    1.그냥 존재하는 가정하고 시도와 함께 실패 할 수 있습니다. 보통 단순하고 임의의 중첩을 지원

    그냥 존재하는 가정하고 시도와 함께 실패 할 수 있습니다. 보통 단순하고 임의의 중첩을 지원

    import scala.util.Try
    import org.apache.spark.sql.DataFrame
    
    def hasColumn(df: DataFrame, path: String) = Try(df(path)).isSuccess
    
    val df = sqlContext.read.json(sc.parallelize(
      """{"foo": [{"bar": {"foobar": 3}}]}""" :: Nil))
    
    hasColumn(df, "foobar")
    // Boolean = false
    
    hasColumn(df, "foo")
    // Boolean = true
    
    hasColumn(df, "foo.bar")
    // Boolean = true
    
    hasColumn(df, "foo.bar.foobar")
    // Boolean = true
    
    hasColumn(df, "foo.bar.foobaz")
    // Boolean = false
    

    심지어 간단한은 :

    val columns = Seq(
      "foobar", "foo", "foo.bar", "foo.bar.foobar", "foo.bar.foobaz")
    
    columns.flatMap(c => Try(df(c)).toOption)
    // Seq[org.apache.spark.sql.Column] = List(
    //   foo, foo.bar AS bar#12, foo.bar.foobar AS foobar#13)
    

    파이썬 해당 :

    from pyspark.sql.utils import AnalysisException
    from pyspark.sql import Row
    
    
    def has_column(df, col):
        try:
            df[col]
            return True
        except AnalysisException:
            return False
    
    df = sc.parallelize([Row(foo=[Row(bar=Row(foobar=3))])]).toDF()
    
    has_column(df, "foobar")
    ## False
    
    has_column(df, "foo")
    ## True
    
    has_column(df, "foo.bar")
    ## True
    
    has_column(df, "foo.bar.foobar")
    ## True
    
    has_column(df, "foo.bar.foobaz")
    ## False
    
  2. ==============================

    2.내가 통상 사용하는 또 다른 방법

    내가 통상 사용하는 또 다른 방법

    df.columns.contains("column-name-to-check")
    

    이 부울을 반환

  3. ==============================

    3.사실 당신도 방금 dataframe 자체에 호출 할 수 있습니다, 열을 사용하기 위해 선택 호출 할 필요는 없다

    사실 당신도 방금 dataframe 자체에 호출 할 수 있습니다, 열을 사용하기 위해 선택 호출 할 필요는 없다

    // define test data
    case class Test(a: Int, b: Int)
    val testList = List(Test(1,2), Test(3,4))
    val testDF = sqlContext.createDataFrame(testList)
    
    // define the hasColumn function
    def hasColumn(df: org.apache.spark.sql.DataFrame, colName: String) = df.columns.contains(colName)
    
    // then you can just use it on the DF with a given column name
    hasColumn(testDF, "a")  // <-- true
    hasColumn(testDF, "c")  // <-- false
    

    또는 당신은 hasColumn 방법은 직접 dataframes에 사용할 수 있도록 포주 나의 서재 패턴을 사용하여 암시 적 클래스를 정의 할 수 있습니다

    implicit class DataFrameImprovements(df: org.apache.spark.sql.DataFrame) {
        def hasColumn(colName: String) = df.columns.contains(colName)
    }
    

    그럼 당신은 그것을로 사용할 수 있습니다 :

    testDF.hasColumn("a") // <-- true
    testDF.hasColumn("c") // <-- false
    
  4. ==============================

    4.는 IT가이 결정되기 전에 시도 내의 표현식을 평가하는 것으로 봅니다 최적이 아니다.

    는 IT가이 결정되기 전에 시도 내의 표현식을 평가하는 것으로 봅니다 최적이 아니다.

    대용량 데이터 세트의 경우, 스칼라에서 아래의 사용 :

    df.schema.fieldNames.contains("column_name")
    
  5. ==============================

    5.이것에 대한 다른 옵션은 df.columns하고 potential_columns에 (이 경우 교집합에) 몇 가지 배열 조작을하는 것입니다.

    이것에 대한 다른 옵션은 df.columns하고 potential_columns에 (이 경우 교집합에) 몇 가지 배열 조작을하는 것입니다.

    // Loading some data (so you can just copy & paste right into spark-shell)
    case class Document( a: String, b: String, c: String)
    val df = sc.parallelize(Seq(Document("a", "b", "c")), 2).toDF
    
    // The columns we want to extract
    val potential_columns = Seq("b", "c", "d")
    
    // Get the intersect of the potential columns and the actual columns, 
    // we turn the array of strings into column objects
    // Finally turn the result into a vararg (: _*)
    df.select(potential_columns.intersect(df.columns).map(df(_)): _*).show
    

    이 아아 위하면 내부 객체 시나리오에 대해 작동하지 않습니다. 당신은 그 스키마를보고해야합니다.

    나는 완전한 열 이름에 potential_columns을 변경하는거야

    val potential_columns = Seq("a.b", "a.c", "a.d")
    
    // Our object model
    case class Document( a: String, b: String, c: String)
    case class Document2( a: Document, b: String, c: String)
    
    // And some data...
    val df = sc.parallelize(Seq(Document2(Document("a", "b", "c"), "c2")), 2).toDF
    
    // We go through each of the fields in the schema.
    // For StructTypes we return an array of parentName.fieldName
    // For everything else we return an array containing just the field name
    // We then flatten the complete list of field names
    // Then we intersect that with our potential_columns leaving us just a list of column we want
    // we turn the array of strings into column objects
    // Finally turn the result into a vararg (: _*)
    df.select(df.schema.map(a => a.dataType match { case s : org.apache.spark.sql.types.StructType => s.fieldNames.map(x => a.name + "." + x) case _ => Array(a.name) }).flatMap(x => x).intersect(potential_columns).map(df(_)) : _*).show
    

    이것은 단지 그래서 일반적인 당신이 더 많은 일을해야 할 것입니다 수 있도록, 한 단계 깊은 간다.

  6. ==============================

    6.파이썬 솔루션을 찾고이 우연히 발견 사람들을 위해, 내가 사용 :

    파이썬 솔루션을 찾고이 우연히 발견 사람들을 위해, 내가 사용 :

    if 'column_name_to_check' in df.columns:
        # do something
    

    내가 파이썬을 사용하여 df.columns.contains의 @Jai 프라 카쉬의 대답 ( '열 이름 - 검사')를 시도했을 때, 나는 AttributeError있어 '목록'개체가 어떤 속성이없는 '포함'.

  7. ==============================

    7.당신이 스키마 정의를 사용하여 JSON을 분쇄기 경우 다음을로드 할 때 열을 확인 할 필요가 없습니다. 는 JSON 소스에없는 경우는 null의 열로 표시됩니다.

    당신이 스키마 정의를 사용하여 JSON을 분쇄기 경우 다음을로드 할 때 열을 확인 할 필요가 없습니다. 는 JSON 소스에없는 경우는 null의 열로 표시됩니다.

            val schemaJson = """
      {
          "type": "struct",
          "fields": [
              {
                "name": field1
                "type": "string",
                "nullable": true,
                "metadata": {}
              },
              {
                "name": field2
                "type": "string",
                "nullable": true,
                "metadata": {}
              }
          ]
      }
            """
        val schema = DataType.fromJson(schemaJson).asInstanceOf[StructType]
    
        val djson = sqlContext.read
        .schema(schema )
        .option("badRecordsPath", readExceptionPath)
        .json(dataPath)
    
  8. ==============================

    8.

    def hasColumn(df: org.apache.spark.sql.DataFrame, colName: String) =
      Try(df.select(colName)).isSuccess
    

    중첩 된 열 이름을 포함하여 컬럼의 존재를 확인하기 위해 위에서 언급 한 기능을 사용합니다.

  9. from https://stackoverflow.com/questions/35904136/how-do-i-detect-if-a-spark-dataframe-has-a-column by cc-by-sa and MIT license