복붙노트

[SCALA] 복잡한 유형의 쿼리 스파크 SQL의 DataFrame

SCALA

복잡한 유형의 쿼리 스파크 SQL의 DataFrame

어떻게 같은지도 / 배열과 같은 복잡한 유형과 RDD를 조회 할 수 있나요? 예를 들어, 나는이 테스트 코드를 작성할 때 :

case class Test(name: String, map: Map[String, String])
val map = Map("hello" -> "world", "hey" -> "there")
val map2 = Map("hello" -> "people", "hey" -> "you")
val rdd = sc.parallelize(Array(Test("first", map), Test("second", map2)))

나는 구문은 같은 것이라고 생각 :

sqlContext.sql("SELECT * FROM rdd WHERE map.hello = world")

또는

sqlContext.sql("SELECT * FROM rdd WHERE map[hello] = world")

하지만 난 얻을

각기.

해결법

  1. ==============================

    1.이 컬럼의 유형에 따라 달라집니다. 일부 더미 데이터로 시작하자 :

    이 컬럼의 유형에 따라 달라집니다. 일부 더미 데이터로 시작하자 :

    import org.apache.spark.sql.functions.{udf, lit}
    import scala.util.Try
    
    case class SubRecord(x: Int)
    case class ArrayElement(foo: String, bar: Int, vals: Array[Double])
    case class Record(
      an_array: Array[Int], a_map: Map[String, String], 
      a_struct: SubRecord, an_array_of_structs: Array[ArrayElement])
    
    
    val df = sc.parallelize(Seq(
      Record(Array(1, 2, 3), Map("foo" -> "bar"), SubRecord(1),
             Array(
               ArrayElement("foo", 1, Array(1.0, 2.0, 2.0)),
               ArrayElement("bar", 2, Array(3.0, 4.0, 5.0)))),
      Record(Array(4, 5, 6), Map("foz" -> "baz"), SubRecord(2),
             Array(ArrayElement("foz", 3, Array(5.0, 6.0)), 
                   ArrayElement("baz", 4, Array(7.0, 8.0))))
    )).toDF
    
    df.registerTempTable("df")
    df.printSchema
    
    // root
    // |-- an_array: array (nullable = true)
    // |    |-- element: integer (containsNull = false)
    // |-- a_map: map (nullable = true)
    // |    |-- key: string
    // |    |-- value: string (valueContainsNull = true)
    // |-- a_struct: struct (nullable = true)
    // |    |-- x: integer (nullable = false)
    // |-- an_array_of_structs: array (nullable = true)
    // |    |-- element: struct (containsNull = true)
    // |    |    |-- foo: string (nullable = true)
    // |    |    |-- bar: integer (nullable = false)
    // |    |    |-- vals: array (nullable = true)
    // |    |    |    |-- element: double (containsNull = false)
    

    노트:

  2. ==============================

    2.당신은 DF로 변환되면, u는 단순히 데이터를 가져올 수 있습니다

    당신은 DF로 변환되면, u는 단순히 데이터를 가져올 수 있습니다

      val rddRow= rdd.map(kv=>{
        val k = kv._1
        val v = kv._2
        Row(k, v)
      })
    
    val myFld1 =  StructField("name", org.apache.spark.sql.types.StringType, true)
    val myFld2 =  StructField("map", org.apache.spark.sql.types.MapType(StringType, StringType), true)
    val arr = Array( myFld1, myFld2)
    val schema = StructType( arr )
    val rowrddDF = sqc.createDataFrame(rddRow, schema)
    rowrddDF.registerTempTable("rowtbl")  
    val rowrddDFFinal = rowrddDF.select(rowrddDF("map.one"))
    or
    val rowrddDFFinal = rowrddDF.select("map.one")
    
  3. ==============================

    3.여기에 내가 한 일이었고, 그것은 일

    여기에 내가 한 일이었고, 그것은 일

    case class Test(name: String, m: Map[String, String])
    val map = Map("hello" -> "world", "hey" -> "there")
    val map2 = Map("hello" -> "people", "hey" -> "you")
    val rdd = sc.parallelize(Array(Test("first", map), Test("second", map2)))
    val rdddf = rdd.toDF
    rdddf.registerTempTable("mytable")
    sqlContext.sql("select m.hello from mytable").show
    

    결과

    +------+
    | hello|
    +------+
    | world|
    |people|
    +------+
    
  4. from https://stackoverflow.com/questions/28332494/querying-spark-sql-dataframe-with-complex-types by cc-by-sa and MIT license