[HADOOP] JSON 문자열 열을 여러 열로 분할
HADOOPJSON 문자열 열을 여러 열로 분할
JSON 문자열 열에서 모든 json 필드를 열로 추출하는 일반적인 솔루션을 찾고 있습니다.
df = spark.read.load(path)
df.show()
'path'에있는 파일의 파일 형식이 마루입니다
샘플 데이터
|id | json_data
| 1 | {"name":"abc", "depts":["dep01", "dep02"]}
| 2 | {"name":"xyz", "depts":["dep03"],"sal":100}
| 3 | {"name":"pqr", "depts":["dep02"], "address":{"city":"SF","state":"CA"}}
예상 출력
|id | name | depts | sal | address_city | address_state
| 1 | "abc" | ["dep01", "dep02"] | null| null | null
| 2 | "xyz" | ["dep03"] | 100 | null | null
| 3 | "pqr" | ["dep02"] | null| "SF" | "CA"
스키마가 정의 된 StructType을 만들고 'from_json'메서드를 사용하여 열을 추출 할 수 있다는 것을 알고 있습니다.
그러나이 방법에는 수동 스키마 정의가 필요합니다.
val myStruct = StructType(
Seq(
StructField("name", StringType),
StructField("depts", ArrayType(StringType)),
StructField("sal", IntegerType)
))
var newDf = df.withColumn("depts", from_json(col("depts"), myStruct))
스키마를 수동으로 정의하지 않고 JSON 열을 병합하는 더 좋은 방법이 있습니까? 제공된 예제에서 사용 가능한 JSON 필드를 볼 수 있습니다. 그러나 실제로 모든 필드를 찾기 위해 모든 행을 통과 할 수는 없습니다.
그래서 열의 이름이나 유형을 지정하지 않고 모든 필드를 열로 분할하는 솔루션을 찾고 있습니다.
해결법
-
==============================
1.CSV 파일이고 JSON 데이터로 단 하나의 열만 오는 경우. 다음 솔루션을 사용할 수 있습니다.
CSV 파일이고 JSON 데이터로 단 하나의 열만 오는 경우. 다음 솔루션을 사용할 수 있습니다.
val csvDF = spark.read.option("delimiter", "|").option("inferSchema", true).option("header", true).csv("test.csv") val rdd = csvDF.select(" json_data").rdd.map(_.getString(0)) val ds = rdd.toDS val jsonDF = spark.read.json(ds) val jsonDFWithID = jsonDF.withColumn("id", monotonically_increasing_id()) val csvDFWithID = csvDF.select($"id ").withColumn("id", monotonically_increasing_id()) val joinDF = jsonDFWithID.join(csvDFWithID, "id").drop("id")
이것이 최종 데이터 프레임의 모습입니다.
scala> joinDF.printSchema() root |-- address: struct (nullable = true) | |-- city: string (nullable = true) | |-- state: string (nullable = true) |-- depts: array (nullable = true) | |-- element: string (containsNull = true) |-- name: string (nullable = true) |-- sal: long (nullable = true) |-- id : double (nullable = true)
JSON 파일 인 경우 다음 솔루션이 작동합니다. 나를 위해. inferSchema는 완벽하게 작동합니다.
json 파일
~/Downloads ▶ cat test.json {"id": 1, "name":"abc", "depts":["dep01", "dep02"]}, {"id": 2, "name":"xyz", "depts" :["dep03"],"sal":100}
암호
scala> scc.read.format("json").option("inerSchema", true).load("Downloads/test.json").show() +--------------+---+----+----+ | depts| id|name| sal| +--------------+---+----+----+ |[dep01, dep02]| 1| abc|null| | [dep03]| 2| xyz| 100| +--------------+---+----+----+
-
==============================
2.json_data가 map 유형이라고 가정하면 (아닌 경우 항상 map으로 변환 할 수 있음) getItem을 사용할 수 있습니다.
json_data가 map 유형이라고 가정하면 (아닌 경우 항상 map으로 변환 할 수 있음) getItem을 사용할 수 있습니다.
df = spark.createDataFrame([ [1, {"name": "abc", "depts": ["dep01", "dep02"]}], [2, {"name": "xyz", "depts": ["dep03"], "sal": 100}] ], ['id', 'json_data'] ) df.select( df.id, df.json_data.getItem('name').alias('name'), df.json_data.getItem('depts').alias('depts'), df.json_data.getItem('sal').alias('sal') ).show() +---+----+--------------+----+ | id|name| depts| sal| +---+----+--------------+----+ | 1| abc|[dep01, dep02]|null| | 2| xyz| [dep03]| 100| +---+----+--------------+----+
열을 추출하는보다 동적 인 방법 :
cols = ['name', 'depts', 'sal'] df.select(df.id, *(df.json_data.getItem(col).alias(col) for col in cols)).show()
-
==============================
3.@Gaurang Shah의 답변을 바탕으로 중첩 JSON 구조를 처리하는 솔루션을 구현하고 monotonically_increasing_id (Non-sequential) 사용과 관련된 문제를 해결했습니다.
@Gaurang Shah의 답변을 바탕으로 중첩 JSON 구조를 처리하는 솔루션을 구현하고 monotonically_increasing_id (Non-sequential) 사용과 관련된 문제를 해결했습니다.
이 방법에서 'populateColumnName'함수는 StructType 열을 재귀 적으로 확인하고 열 이름을 채 웁니다.
'renameColumns'함수는 '.'을 바꾸어 열 이름을 바꿉니다. 중첩 된 json 필드를 식별하려면 '_'를 사용하십시오.
'addIndex'함수는 JSON 열을 구문 분석 한 후 데이터 프레임에 조인하기 위해 데이터 프레임에 인덱스를 추가합니다.
def flattenJSON(df : DataFrame, columnName: String) : DataFrame = { val indexCol = "internal_temp_id" def populateColumnName(col : StructField) : Array[String] = { col.dataType match { case struct: StructType => struct.fields.flatMap(populateColumnName).map(col.name + "." + _) case rest => Array(col.name) } } def renameColumns(name : String) : String = { if(name contains ".") { name + " as " + name.replaceAll("\\.", "_") } else name } def addIndex(df : DataFrame) : DataFrame = { // Append "rowid" column of type Long val newSchema = StructType(df.schema.fields ++ Array(StructField(indexCol, LongType, false))) // Zip on RDD level val rddWithId = df.rdd.zipWithIndex // Convert back to DataFrame spark.createDataFrame(rddWithId.map{ case (row, index) => Row.fromSeq(row.toSeq ++ Array(index))}, newSchema) } val dfWithID = addIndex(df) val jsonDF = df.select(columnName) val ds = jsonDF.rdd.map(_.getString(0)).toDS val parseDF = spark.read.option("inferSchema",true).json(ds) val columnNames = parseDF.schema.fields.flatMap(populateColumnName).map(renameColumns) var resultDF = parseDF.selectExpr(columnNames:_*) val jsonDFWithID = addIndex(resultDF) val joinDF = dfWithID.join(jsonDFWithID, indexCol).drop(indexCol) joinDF } val res = flattenJSON(jsonDF, "address")
from https://stackoverflow.com/questions/57779692/split-json-string-column-to-multiple-columns by cc-by-sa and MIT license
'HADOOP' 카테고리의 다른 글
[HADOOP] Hive CLI에서 HIVE 스크립트를 실행하는 방법 (0) | 2019.09.10 |
---|---|
[HADOOP] ArrayWritable을 통해 반복-NoSuchMethodException (0) | 2019.09.10 |
[HADOOP] 쿼리 결과에 정수 고유 ID를 추가하는 방법-__efficiently__? (0) | 2019.09.10 |
[HADOOP] java를 사용하는 Titan-1.0.0 + Hbase-0.98.20의 원격 모드에서 연결 오류 (0) | 2019.09.10 |
[HADOOP] UserAgent 문자열에서 값을 제거하는 Java 또는 Pig 정규식 (0) | 2019.09.09 |