복붙노트

[SCALA] JSON에 행 스파크

SCALA

JSON에 행 스파크

나는 스파크 v.1.6 (사용 스칼라) dataframe에서 JSON을 생성하고 싶습니다. 나는 df.toJSON 일의 간단한 해결책이 있다는 것을 알고있다.

그러나, 내 문제는 조금 다른 보인다. 다음과 같은 열이있는 경우 A는 dataframe에 대한 고려 :

|  A  |     B     |  C1  |  C2  |    C3   |
-------------------------------------------
|  1  | test      |  ab  |  22  |  TRUE   |
|  2  | mytest    |  gh  |  17  |  FALSE  |

나는 종단 A dataframe과에서하고 싶은

|  A  |     B     |                        C                   |
----------------------------------------------------------------
|  1  | test      | { "c1" : "ab", "c2" : 22, "c3" : TRUE }    |
|  2  | mytest    | { "c1" : "gh", "c2" : 17, "c3" : FALSE }   |

여기서 C는 C1, C2, C3를 포함하는 JSON이다. 불행히도, 난 컴파일 시간에 내가 모르는 무엇을 (열 A와 항상 "고정"되어 B 제외)과 같은 dataframe의 모습.

나는이 필요한 이유에 관해서는 : 나는 결과를 주변에 보내기위한 Protobuf을 사용하고 있습니다. 불행하게도, 내 dataframe 때때로 예상하고 난 여전히 Protobuf를 통해 사람들을 보내는 것보다 더 많은 열이 있습니다,하지만 난 정의에 모든 열을 지정하지 않습니다.

이걸 어떻게 달성 할 수 있습니까?

해결법

  1. ==============================

    1.스파크 2.1이 사용 사례 (# ​​15354 참조)에 대한 기본 지원이 있어야합니다.

    스파크 2.1이 사용 사례 (# ​​15354 참조)에 대한 기본 지원이 있어야합니다.

    import org.apache.spark.sql.functions.to_json
    df.select(to_json(struct($"c1", $"c2", $"c3")))
    
  2. ==============================

    2.먼저 변환 C의 구조체로는 할 수 있습니다 :

    먼저 변환 C의 구조체로는 할 수 있습니다 :

    val dfStruct = df.select($"A", $"B", struct($"C1", $"C2", $"C3").alias("C"))
    

    이 구조는 JSONL가 이전과 toJSON를 사용하여 변환 할 수 있습니다 :

    dfStruct.toJSON.collect
    // Array[String] = Array(
    //   {"A":1,"B":"test","C":{"C1":"ab","C2":22,"C3":true}}, 
    //   {"A":2,"B":"mytest","C":{"C1":"gh","C2":17,"C3":false}})
    

    나는 하나의 열을 변환 할 수 있지만,이 중 하나를 개별적으로 변환 가입 또는 UDF에서 좋아 JSON 파서를 사용할 수있는 내장 된 방법을 인식하지입니다.

    case class C(C1: String, C2: Int, C3: Boolean)
    
    object CJsonizer {
      import org.json4s._
      import org.json4s.JsonDSL._
      import org.json4s.jackson.Serialization
      import org.json4s.jackson.Serialization.write
    
      implicit val formats = Serialization.formats(org.json4s.NoTypeHints)
    
      def toJSON(c1: String, c2: Int, c3: Boolean) = write(C(c1, c2, c3))
    }
    
    
    val cToJSON = udf((c1: String, c2: Int, c3: Boolean) => 
      CJsonizer.toJSON(c1, c2, c3))
    
    df.withColumn("c_json", cToJSON($"C1", $"C2", $"C3"))
    
  3. ==============================

    3.여기에는 JSON 파서는, 그리고 스키마에 적응하지 :

    여기에는 JSON 파서는, 그리고 스키마에 적응하지 :

    import org.apache.spark.sql.functions.{col, concat, concat_ws, lit}
    
    df.select(
      col(df.columns(0)),
      col(df.columns(1)),
      concat(
        lit("{"), 
        concat_ws(",",df.dtypes.slice(2, df.dtypes.length).map(dt => {
          val c = dt._1;
          val t = dt._2;
          concat(
            lit("\"" + c + "\":" + (if (t == "StringType") "\""; else "")  ),
            col(c),
            lit(if(t=="StringType") "\""; else "") 
          )
        }):_*), 
        lit("}")
      ) as "C"
    ).collect()
    
  4. ==============================

    4.나는 to_json 문제를 해결하려면이 명령을 사용합니다 :

    나는 to_json 문제를 해결하려면이 명령을 사용합니다 :

    output_df = (df.select(to_json(struct(col("*"))).alias("content")))
    
  5. from https://stackoverflow.com/questions/36157810/spark-row-to-json by cc-by-sa and MIT license