복붙노트

[HADOOP] Spark에서 중첩 된 모음을 읽는 방법

HADOOP

Spark에서 중첩 된 모음을 읽는 방법

나는 기둥 중 하나가있는 쪽매 테이블을 가지고있다.

Hive에서이 테이블에 대한 쿼리를 LATERAL VIEW 구문을 사용하여 실행할 수 있습니다.

이 테이블을 RDD로 읽는 법, 더 중요한 것은 Spark에서이 중첩 된 컬렉션을 필터링, 맵핑하는 등의 방법입니다.

Spark 설명서에서 이에 대한 참조를 찾을 수 없습니다. 어떤 정보라도 미리 감사드립니다!

추신. 펠트는 테이블에 몇 가지 통계를 제공하는 데 도움이 될 수 있습니다.  메인 테이블의 컬럼 수 ~ 600. 행 수 ~ 200m.  중첩 된 콜렉션의 "열"수 ~ 10. 중첩 수집의 평균 레코드 수 ~ 35.

해결법

  1. ==============================

    1.중첩 수집의 경우 마술은 없습니다. 스파크는 RDD [(String, String)]와 RDD [(String, Seq [String])]와 같은 방식으로 처리합니다.

    중첩 수집의 경우 마술은 없습니다. 스파크는 RDD [(String, String)]와 RDD [(String, Seq [String])]와 같은 방식으로 처리합니다.

    그러나 파켓 파일에서 이러한 중첩 된 콜렉션을 읽는 것은 까다로울 수 있습니다.

    스파크 셸 (1.3.1)의 예를 들어 보겠습니다.

    scala> import sqlContext.implicits._
    import sqlContext.implicits._
    
    scala> case class Inner(a: String, b: String)
    defined class Inner
    
    scala> case class Outer(key: String, inners: Seq[Inner])
    defined class Outer
    

    쪽모이 세공 파일 작성 :

    scala> val outers = sc.parallelize(List(Outer("k1", List(Inner("a", "b")))))
    outers: org.apache.spark.rdd.RDD[Outer] = ParallelCollectionRDD[0] at parallelize at <console>:25
    
    scala> outers.toDF.saveAsParquetFile("outers.parquet")
    

    쪽모이 세공 파일을 읽으십시오 :

    scala> import org.apache.spark.sql.catalyst.expressions.Row
    import org.apache.spark.sql.catalyst.expressions.Row
    
    scala> val dataFrame = sqlContext.parquetFile("outers.parquet")
    dataFrame: org.apache.spark.sql.DataFrame = [key: string, inners: array<struct<a:string,b:string>>]   
    
    scala> val outers = dataFrame.map { row =>
         |   val key = row.getString(0)
         |   val inners = row.getAs[Seq[Row]](1).map(r => Inner(r.getString(0), r.getString(1)))
         |   Outer(key, inners)
         | }
    outers: org.apache.spark.rdd.RDD[Outer] = MapPartitionsRDD[8] at map at DataFrame.scala:848
    

    중요한 부분은 row.getAs [Seq [Row]] (1)입니다. 구조체의 중첩 된 시퀀스의 내부 표현은 ArrayBuffer [Row]입니다. Seq [Row] 대신 수퍼 유형을 사용할 수 있습니다. 1은 외부 행의 열 색인입니다. 여기 getAs 메소드를 사용했지만 Spark의 최신 버전에는 대안이 있습니다. Row 특성의 소스 코드를 참조하십시오.

    이제 RDD [Outer]를 사용하면 원하는 변형이나 동작을 적용 할 수 있습니다.

    // Filter the outers
    outers.filter(_.inners.nonEmpty)
    
    // Filter the inners
    outers.map(outer => outer.copy(inners = outer.inners.filter(_.a == "a")))
    

    우리는 parquet 파일을 읽기 위해서만 spark-SQL 라이브러리를 사용했다. 예를 들어 RDD에 매핑하기 전에 DataFrame에서 직접 원하는 열만 선택할 수 있습니다.

    dataFrame.select('col1, 'col2).map { row => ... }
    
  2. ==============================

    2.필자가 사용하고있는 Python 기반 답변을 제공 할 것이다. 나는 스칼라가 비슷한 것을 가지고 있다고 생각한다.

    필자가 사용하고있는 Python 기반 답변을 제공 할 것이다. 나는 스칼라가 비슷한 것을 가지고 있다고 생각한다.

    explode 함수는 Python API 문서에 따라 DataFrames의 중첩 배열을 처리하기 위해 Spark 1.4.0에 추가되었습니다.

    테스트 데이터 프레임 생성 :

    from pyspark.sql import Row
    
    df = sqlContext.createDataFrame([Row(a=1, intlist=[1,2,3]), Row(a=2, intlist=[4,5,6])])
    df.show()
    
    ## +-+--------------------+
    ## |a|             intlist|
    ## +-+--------------------+
    ## |1|ArrayBuffer(1, 2, 3)|
    ## |2|ArrayBuffer(4, 5, 6)|
    ## +-+--------------------+
    

    폭발을 사용하여 목록 열을 평평하게 만듭니다.

    from pyspark.sql.functions import explode
    
    df.select(df.a, explode(df.intlist)).show()
    
    ## +-+---+
    ## |a|_c0|
    ## +-+---+
    ## |1|  1|
    ## |1|  2|
    ## |1|  3|
    ## |2|  4|
    ## |2|  5|
    ## |2|  6|
    ## +-+---+
    
  3. ==============================

    3.또 다른 접근법은 다음과 같은 패턴 일치를 사용하는 것입니다.

    또 다른 접근법은 다음과 같은 패턴 일치를 사용하는 것입니다.

    val rdd: RDD[(String, List[(String, String)]] = dataFrame.map(_.toSeq.toList match { 
      case List(key: String, inners: Seq[Row]) => key -> inners.map(_.toSeq.toList match {
        case List(a:String, b: String) => (a, b)
      }).toList
    })
    

    행에서 직접 패턴 일치를 패턴화할 수 있지만 몇 가지 이유로 실패 할 수 있습니다.

  4. ==============================

    4.위의 답변은 모든 중대한 답변이며 다른 측면에서이 질문에 대처합니다. Spark SQL은 중첩 된 데이터에 액세스하는 데에도 매우 유용합니다.

    위의 답변은 모든 중대한 답변이며 다른 측면에서이 질문에 대처합니다. Spark SQL은 중첩 된 데이터에 액세스하는 데에도 매우 유용합니다.

    다음은 SQL에서 직접 explode ()를 사용하여 중첩 된 컬렉션을 쿼리하는 방법을 보여주는 예제입니다.

    SELECT hholdid, tsp.person_seq_no 
    FROM (  SELECT hholdid, explode(tsp_ids) as tsp 
            FROM disc_mrt.unified_fact uf
         )
    

    tsp_ids는 위의 외부 쿼리에서 선택하는 person_seq_no를 비롯한 많은 특성을 가진 구조체의 중첩입니다.

    위의 내용은 Spark 2.0에서 테스트되었습니다. 나는 작은 테스트를했는데 Spark 1.6에서는 작동하지 않습니다. 이 질문은 Spark 2가 주변에 없었을 때 묻기 때문에이 답변은 중첩 된 구조를 다루는 데 사용할 수있는 옵션 목록에 멋지게 추가됩니다.

    SQL 액세스를 위해 explode ()에서 JIRA를 해결하지 못했습니다 :

  5. from https://stackoverflow.com/questions/30008127/how-to-read-a-nested-collection-in-spark by cc-by-sa and MIT license