[SCALA] JSON에 행 스파크
SCALAJSON에 행 스파크
나는 스파크 v.1.6 (사용 스칼라) dataframe에서 JSON을 생성하고 싶습니다. 나는 df.toJSON 일의 간단한 해결책이 있다는 것을 알고있다.
그러나, 내 문제는 조금 다른 보인다. 다음과 같은 열이있는 경우 A는 dataframe에 대한 고려 :
| A | B | C1 | C2 | C3 |
-------------------------------------------
| 1 | test | ab | 22 | TRUE |
| 2 | mytest | gh | 17 | FALSE |
나는 종단 A dataframe과에서하고 싶은
| A | B | C |
----------------------------------------------------------------
| 1 | test | { "c1" : "ab", "c2" : 22, "c3" : TRUE } |
| 2 | mytest | { "c1" : "gh", "c2" : 17, "c3" : FALSE } |
여기서 C는 C1, C2, C3를 포함하는 JSON이다. 불행히도, 난 컴파일 시간에 내가 모르는 무엇을 (열 A와 항상 "고정"되어 B 제외)과 같은 dataframe의 모습.
나는이 필요한 이유에 관해서는 : 나는 결과를 주변에 보내기위한 Protobuf을 사용하고 있습니다. 불행하게도, 내 dataframe 때때로 예상하고 난 여전히 Protobuf를 통해 사람들을 보내는 것보다 더 많은 열이 있습니다,하지만 난 정의에 모든 열을 지정하지 않습니다.
이걸 어떻게 달성 할 수 있습니까?
해결법
-
==============================
1.스파크 2.1이 사용 사례 (# 15354 참조)에 대한 기본 지원이 있어야합니다.
스파크 2.1이 사용 사례 (# 15354 참조)에 대한 기본 지원이 있어야합니다.
import org.apache.spark.sql.functions.to_json df.select(to_json(struct($"c1", $"c2", $"c3")))
-
==============================
2.먼저 변환 C의 구조체로는 할 수 있습니다 :
먼저 변환 C의 구조체로는 할 수 있습니다 :
val dfStruct = df.select($"A", $"B", struct($"C1", $"C2", $"C3").alias("C"))
이 구조는 JSONL가 이전과 toJSON를 사용하여 변환 할 수 있습니다 :
dfStruct.toJSON.collect // Array[String] = Array( // {"A":1,"B":"test","C":{"C1":"ab","C2":22,"C3":true}}, // {"A":2,"B":"mytest","C":{"C1":"gh","C2":17,"C3":false}})
나는 하나의 열을 변환 할 수 있지만,이 중 하나를 개별적으로 변환 가입 또는 UDF에서 좋아 JSON 파서를 사용할 수있는 내장 된 방법을 인식하지입니다.
case class C(C1: String, C2: Int, C3: Boolean) object CJsonizer { import org.json4s._ import org.json4s.JsonDSL._ import org.json4s.jackson.Serialization import org.json4s.jackson.Serialization.write implicit val formats = Serialization.formats(org.json4s.NoTypeHints) def toJSON(c1: String, c2: Int, c3: Boolean) = write(C(c1, c2, c3)) } val cToJSON = udf((c1: String, c2: Int, c3: Boolean) => CJsonizer.toJSON(c1, c2, c3)) df.withColumn("c_json", cToJSON($"C1", $"C2", $"C3"))
-
==============================
3.여기에는 JSON 파서는, 그리고 스키마에 적응하지 :
여기에는 JSON 파서는, 그리고 스키마에 적응하지 :
import org.apache.spark.sql.functions.{col, concat, concat_ws, lit} df.select( col(df.columns(0)), col(df.columns(1)), concat( lit("{"), concat_ws(",",df.dtypes.slice(2, df.dtypes.length).map(dt => { val c = dt._1; val t = dt._2; concat( lit("\"" + c + "\":" + (if (t == "StringType") "\""; else "") ), col(c), lit(if(t=="StringType") "\""; else "") ) }):_*), lit("}") ) as "C" ).collect()
-
==============================
4.나는 to_json 문제를 해결하려면이 명령을 사용합니다 :
나는 to_json 문제를 해결하려면이 명령을 사용합니다 :
output_df = (df.select(to_json(struct(col("*"))).alias("content")))
from https://stackoverflow.com/questions/36157810/spark-row-to-json by cc-by-sa and MIT license
'SCALA' 카테고리의 다른 글
[SCALA] '이' ''와 어떤 유형 / 차이 명시 자기 참조 (0) | 2019.11.09 |
---|---|
[SCALA] `##`와`hashCode`의 차이점은 무엇입니까? (0) | 2019.11.09 |
[SCALA] 빈 / 널 필드 값 새로운 Dataframe 만들기 (0) | 2019.11.09 |
[SCALA] 스칼라에서 익명 함수에서 매개 변수 전에 암시 적 키워드 (0) | 2019.11.09 |
[SCALA] 어떻게 스파크에 dataframe에 JSON 문자열을 변환하는 (0) | 2019.11.09 |