복붙노트

[HADOOP] JSON 문자열 열을 여러 열로 분할

HADOOP

JSON 문자열 열을 여러 열로 분할

JSON 문자열 열에서 모든 json 필드를 열로 추출하는 일반적인 솔루션을 찾고 있습니다.

df =  spark.read.load(path)
df.show()

'path'에있는 파일의 파일 형식이 마루입니다

샘플 데이터

|id | json_data
| 1 | {"name":"abc", "depts":["dep01", "dep02"]}
| 2 | {"name":"xyz", "depts":["dep03"],"sal":100}
| 3 | {"name":"pqr", "depts":["dep02"], "address":{"city":"SF","state":"CA"}}

예상 출력

|id | name    | depts              | sal | address_city | address_state
| 1 | "abc"   | ["dep01", "dep02"] | null| null         | null
| 2 | "xyz"   | ["dep03"]          | 100 | null         | null
| 3 | "pqr"   | ["dep02"]          | null| "SF"         | "CA"

스키마가 정의 된 StructType을 만들고 'from_json'메서드를 사용하여 열을 추출 할 수 있다는 것을 알고 있습니다.

그러나이 방법에는 수동 스키마 정의가 필요합니다.

val myStruct = StructType(
  Seq(
    StructField("name", StringType),
    StructField("depts", ArrayType(StringType)),
    StructField("sal", IntegerType)
  ))

var newDf = df.withColumn("depts", from_json(col("depts"), myStruct))

스키마를 수동으로 정의하지 않고 JSON 열을 병합하는 더 좋은 방법이 있습니까? 제공된 예제에서 사용 가능한 JSON 필드를 볼 수 있습니다. 그러나 실제로 모든 필드를 찾기 위해 모든 행을 통과 할 수는 없습니다.

그래서 열의 이름이나 유형을 지정하지 않고 모든 필드를 열로 분할하는 솔루션을 찾고 있습니다.

해결법

  1. ==============================

    1.CSV 파일이고 JSON 데이터로 단 하나의 열만 오는 경우. 다음 솔루션을 사용할 수 있습니다.

    CSV 파일이고 JSON 데이터로 단 하나의 열만 오는 경우. 다음 솔루션을 사용할 수 있습니다.

    val csvDF = spark.read.option("delimiter", "|").option("inferSchema", true).option("header", true).csv("test.csv")
    val rdd = csvDF.select(" json_data").rdd.map(_.getString(0))
    val ds = rdd.toDS
    val jsonDF = spark.read.json(ds)
    val jsonDFWithID = jsonDF.withColumn("id", monotonically_increasing_id())
    val csvDFWithID = csvDF.select($"id ").withColumn("id", monotonically_increasing_id())
    val joinDF = jsonDFWithID.join(csvDFWithID, "id").drop("id")
    

    이것이 최종 데이터 프레임의 모습입니다.

    scala> joinDF.printSchema()
    root
     |-- address: struct (nullable = true)
     |    |-- city: string (nullable = true)
     |    |-- state: string (nullable = true)
     |-- depts: array (nullable = true)
     |    |-- element: string (containsNull = true)
     |-- name: string (nullable = true)
     |-- sal: long (nullable = true)
     |-- id : double (nullable = true)
    

    JSON 파일 인 경우 다음 솔루션이 작동합니다. 나를 위해. inferSchema는 완벽하게 작동합니다.

    json 파일

    ~/Downloads ▶ cat test.json
    {"id": 1, "name":"abc", "depts":["dep01", "dep02"]},
    {"id": 2, "name":"xyz", "depts" :["dep03"],"sal":100}
    

    암호

    scala> scc.read.format("json").option("inerSchema", true).load("Downloads/test.json").show()
    +--------------+---+----+----+
    |         depts| id|name| sal|
    +--------------+---+----+----+
    |[dep01, dep02]|  1| abc|null|
    |       [dep03]|  2| xyz| 100|
    +--------------+---+----+----+
    
  2. ==============================

    2.json_data가 map 유형이라고 가정하면 (아닌 경우 항상 map으로 변환 할 수 있음) getItem을 사용할 수 있습니다.

    json_data가 map 유형이라고 가정하면 (아닌 경우 항상 map으로 변환 할 수 있음) getItem을 사용할 수 있습니다.

    df = spark.createDataFrame([
        [1, {"name": "abc", "depts": ["dep01", "dep02"]}],
        [2, {"name": "xyz", "depts": ["dep03"], "sal": 100}]
    ],
        ['id', 'json_data']
    )
    
    df.select(
        df.id, 
        df.json_data.getItem('name').alias('name'), 
        df.json_data.getItem('depts').alias('depts'), 
        df.json_data.getItem('sal').alias('sal')
    ).show()
    
    +---+----+--------------+----+
    | id|name|         depts| sal|
    +---+----+--------------+----+
    |  1| abc|[dep01, dep02]|null|
    |  2| xyz|       [dep03]| 100|
    +---+----+--------------+----+
    

    열을 추출하는보다 동적 인 방법 :

    cols = ['name', 'depts', 'sal']
    df.select(df.id, *(df.json_data.getItem(col).alias(col) for col in cols)).show()
    
  3. ==============================

    3.@Gaurang Shah의 답변을 바탕으로 중첩 JSON 구조를 처리하는 솔루션을 구현하고 monotonically_increasing_id (Non-sequential) 사용과 관련된 문제를 해결했습니다.

    @Gaurang Shah의 답변을 바탕으로 중첩 JSON 구조를 처리하는 솔루션을 구현하고 monotonically_increasing_id (Non-sequential) 사용과 관련된 문제를 해결했습니다.

    이 방법에서 'populateColumnName'함수는 StructType 열을 재귀 적으로 확인하고 열 이름을 채 웁니다.

    'renameColumns'함수는 '.'을 바꾸어 열 이름을 바꿉니다. 중첩 된 json 필드를 식별하려면 '_'를 사용하십시오.

    'addIndex'함수는 JSON 열을 구문 분석 한 후 데이터 프레임에 조인하기 위해 데이터 프레임에 인덱스를 추가합니다.

    def flattenJSON(df : DataFrame, columnName: String) : DataFrame = {
    
        val indexCol = "internal_temp_id"
    
        def populateColumnName(col : StructField) : Array[String] = {
            col.dataType match {
              case struct: StructType => struct.fields.flatMap(populateColumnName).map(col.name + "." + _)
              case rest         => Array(col.name)
            }
        }
    
        def renameColumns(name : String) : String = {
            if(name contains ".") {
                name + " as " + name.replaceAll("\\.", "_")
            }
            else name
        }
    
        def addIndex(df : DataFrame) : DataFrame = {
    
            // Append "rowid" column of type Long
            val newSchema = StructType(df.schema.fields ++ Array(StructField(indexCol, LongType, false)))
    
            // Zip on RDD level
            val rddWithId = df.rdd.zipWithIndex
            // Convert back to DataFrame
            spark.createDataFrame(rddWithId.map{ case (row, index) => Row.fromSeq(row.toSeq ++ Array(index))}, newSchema)
        }
    
        val dfWithID = addIndex(df)
    
        val jsonDF = df.select(columnName)
    
        val ds = jsonDF.rdd.map(_.getString(0)).toDS
        val parseDF = spark.read.option("inferSchema",true).json(ds)
    
        val columnNames = parseDF.schema.fields.flatMap(populateColumnName).map(renameColumns)
    
        var resultDF = parseDF.selectExpr(columnNames:_*)
    
        val jsonDFWithID = addIndex(resultDF)
    
        val joinDF = dfWithID.join(jsonDFWithID, indexCol).drop(indexCol)
    
        joinDF
    }
    
    val res = flattenJSON(jsonDF, "address")
    
  4. from https://stackoverflow.com/questions/57779692/split-json-string-column-to-multiple-columns by cc-by-sa and MIT license