복붙노트

[SCALA] 어떻게 스파크 DataFrames를 사용하여 JSON 데이터 열을 쿼리합니다?

SCALA

어떻게 스파크 DataFrames를 사용하여 JSON 데이터 열을 쿼리합니다?

나는 간단하게하기 위해 같은 형태하는 카산드라의 테이블이 있습니다 :

key: text
jsonData: text
blobData: blob

I이 사용 스파크 이용한 스파크 산드 커넥터의 기본 데이터 구조를 만들 수있다 :

val df = sqlContext.read
  .format("org.apache.spark.sql.cassandra")
  .options(Map("table" -> "mytable", "keyspace" -> "ks1"))
  .load()

그 기본 구조로 JSON 데이터를 확장하지만 나는 사투를 벌인거야. 나는 궁극적으로 JSON 문자열 내에서 속성을 기반으로 필터링하고 BLOB 데이터를 반환 할 수 있어야합니다. jsonData.foo = "바"와 blobData를 반환 비슷 해요. 이것은 현재 수 있습니까?

해결법

  1. ==============================

    1.스파크> = 2.4

    스파크> = 2.4

    필요한 경우, 스키마 (이 임의의 행이 스키마의 유효한 대표 있다고 가정 유의하시기 바랍니다) schema_of_json 기능을 사용하여 측정 할 수있다.

    import org.apache.spark.sql.functions.{lit, schema_of_json, from_json}
    
    val schema = schema_of_json(lit(df.select($"jsonData").as[String].first))
    df.withColumn("jsonData", from_json($"jsonData", schema, Map[String, String]()))
    

    스파크> = 2.1

    당신은 from_json 기능을 사용할 수 있습니다 :

    import org.apache.spark.sql.functions.from_json
    import org.apache.spark.sql.types._
    
    val schema = StructType(Seq(
      StructField("k", StringType, true), StructField("v", DoubleType, true)
    ))
    
    df.withColumn("jsonData", from_json($"jsonData", schema))
    

    스파크> = 1.6

    당신은 열 및 경로를 사용 get_json_object을 사용할 수 있습니다 :

    import org.apache.spark.sql.functions.get_json_object
    
    val exprs = Seq("k", "v").map(
      c => get_json_object($"jsonData", s"$$.$c").alias(c))
    
    df.select($"*" +: exprs: _*)
    

    추가 예상 유형으로 캐스트 할 수있는 개별 스트링 필드를 추출합니다.

    경로 인수 $ 선도 도트 구문을 사용하여 표현된다. 문서 루트 나타내는 (위의 코드를 문자열 보간 $를 사용하기 때문에 $$ 따라서, 탈출해야합니다.).

    스파크 <= 1.5 :

    내가 아는 한 그것은 바로 수 없습니다. 이 비슷한을 시도 할 수 있습니다 :

    val df = sc.parallelize(Seq(
      ("1", """{"k": "foo", "v": 1.0}""", "some_other_field_1"),
      ("2", """{"k": "bar", "v": 3.0}""", "some_other_field_2")
    )).toDF("key", "jsonData", "blobData")
    

    나는 BLOB 필드가 JSON으로 표현 될 수 있다고 가정합니다. 그렇지 않으면 분할 및 결합을 생략 택시로 :

    import org.apache.spark.sql.Row
    
    val blobs = df.drop("jsonData").withColumnRenamed("key", "bkey")
    val jsons = sqlContext.read.json(df.drop("blobData").map{
      case Row(key: String, json: String) =>
        s"""{"key": "$key", "jsonData": $json}"""
    }) 
    
    val parsed = jsons.join(blobs, $"key" === $"bkey").drop("bkey")
    parsed.printSchema
    
    // root
    //  |-- jsonData: struct (nullable = true)
    //  |    |-- k: string (nullable = true)
    //  |    |-- v: double (nullable = true)
    //  |-- key: long (nullable = true)
    //  |-- blobData: string (nullable = true)
    

    대안 (저렴 더 복잡하지만) 접근법 JSON 출력 구조체 또는 맵 항목을 분석하기위한 UDF를 사용하는 것이다. 이 같은 예를 들어 뭔가 :

    import net.liftweb.json.parse
    
    case class KV(k: String, v: Int)
    
    val parseJson = udf((s: String) => {
      implicit val formats = net.liftweb.json.DefaultFormats
      parse(s).extract[KV]
    })
    
    val parsed = df.withColumn("parsedJSON", parseJson($"jsonData"))
    parsed.show
    
    // +---+--------------------+------------------+----------+
    // |key|            jsonData|          blobData|parsedJSON|
    // +---+--------------------+------------------+----------+
    // |  1|{"k": "foo", "v":...|some_other_field_1|   [foo,1]|
    // |  2|{"k": "bar", "v":...|some_other_field_2|   [bar,3]|
    // +---+--------------------+------------------+----------+
    
    parsed.printSchema
    
    // root
    //  |-- key: string (nullable = true)
    //  |-- jsonData: string (nullable = true)
    //  |-- blobData: string (nullable = true)
    //  |-- parsedJSON: struct (nullable = true)
    //  |    |-- k: string (nullable = true)
    //  |    |-- v: integer (nullable = false)
    
  2. ==============================

    2.from_json 기능은 당신이 찾고있는 정확하게이다. 코드는 같이 보일 것입니다 :

    from_json 기능은 당신이 찾고있는 정확하게이다. 코드는 같이 보일 것입니다 :

    val df = sqlContext.read
      .format("org.apache.spark.sql.cassandra")
      .options(Map("table" -> "mytable", "keyspace" -> "ks1"))
      .load()
    
    //You can define whatever struct type that your json states
    val schema = StructType(Seq(
      StructField("key", StringType, true), 
      StructField("value", DoubleType, true)
    ))
    
    df.withColumn("jsonData", from_json(col("jsonData"), schema))
    
  3. ==============================

    3.기본 JSON 문자열입니다

    기본 JSON 문자열입니다

    "{ \"column_name1\":\"value1\",\"column_name2\":\"value2\",\"column_name3\":\"value3\",\"column_name5\":\"value5\"}";
    

    다음은 JSON을 필터링하고 카산드라에 필요한 데이터를로드하는 스크립트입니다.

      sqlContext.read.json(rdd).select("column_name1 or fields name in Json", "column_name2","column_name2")
                .write.format("org.apache.spark.sql.cassandra")
                .options(Map("table" -> "Table_name", "keyspace" -> "Key_Space_name"))
                .mode(SaveMode.Append)
                .save()
    
  4. ==============================

    4.나는 다음을 사용

    나는 다음을 사용

    (사용 가능한 2.2.0 이후, 그리고 난 당신의 JSON 문자열 열은 열 인덱스 0이라고 가정하고)

    def parse(df: DataFrame, spark: SparkSession): DataFrame = {
        val stringDf = df.map((value: Row) => value.getString(0), Encoders.STRING)
        spark.read.json(stringDf)
    }
    

    그것은 자동으로 JSON에서 스키마를 추론 할 것이다. 여기 문서화 : https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/sql/DataFrameReader.html

  5. from https://stackoverflow.com/questions/34069282/how-to-query-json-data-column-using-spark-dataframes by cc-by-sa and MIT license