[HADOOP] Spark에서 중첩 된 모음을 읽는 방법
HADOOPSpark에서 중첩 된 모음을 읽는 방법
나는 기둥 중 하나가있는 쪽매 테이블을 가지고있다.
Hive에서이 테이블에 대한 쿼리를 LATERAL VIEW 구문을 사용하여 실행할 수 있습니다.
이 테이블을 RDD로 읽는 법, 더 중요한 것은 Spark에서이 중첩 된 컬렉션을 필터링, 맵핑하는 등의 방법입니다.
Spark 설명서에서 이에 대한 참조를 찾을 수 없습니다. 어떤 정보라도 미리 감사드립니다!
추신. 펠트는 테이블에 몇 가지 통계를 제공하는 데 도움이 될 수 있습니다. 메인 테이블의 컬럼 수 ~ 600. 행 수 ~ 200m. 중첩 된 콜렉션의 "열"수 ~ 10. 중첩 수집의 평균 레코드 수 ~ 35.
해결법
-
==============================
1.중첩 수집의 경우 마술은 없습니다. 스파크는 RDD [(String, String)]와 RDD [(String, Seq [String])]와 같은 방식으로 처리합니다.
중첩 수집의 경우 마술은 없습니다. 스파크는 RDD [(String, String)]와 RDD [(String, Seq [String])]와 같은 방식으로 처리합니다.
그러나 파켓 파일에서 이러한 중첩 된 콜렉션을 읽는 것은 까다로울 수 있습니다.
스파크 셸 (1.3.1)의 예를 들어 보겠습니다.
scala> import sqlContext.implicits._ import sqlContext.implicits._ scala> case class Inner(a: String, b: String) defined class Inner scala> case class Outer(key: String, inners: Seq[Inner]) defined class Outer
쪽모이 세공 파일 작성 :
scala> val outers = sc.parallelize(List(Outer("k1", List(Inner("a", "b"))))) outers: org.apache.spark.rdd.RDD[Outer] = ParallelCollectionRDD[0] at parallelize at <console>:25 scala> outers.toDF.saveAsParquetFile("outers.parquet")
쪽모이 세공 파일을 읽으십시오 :
scala> import org.apache.spark.sql.catalyst.expressions.Row import org.apache.spark.sql.catalyst.expressions.Row scala> val dataFrame = sqlContext.parquetFile("outers.parquet") dataFrame: org.apache.spark.sql.DataFrame = [key: string, inners: array<struct<a:string,b:string>>] scala> val outers = dataFrame.map { row => | val key = row.getString(0) | val inners = row.getAs[Seq[Row]](1).map(r => Inner(r.getString(0), r.getString(1))) | Outer(key, inners) | } outers: org.apache.spark.rdd.RDD[Outer] = MapPartitionsRDD[8] at map at DataFrame.scala:848
중요한 부분은 row.getAs [Seq [Row]] (1)입니다. 구조체의 중첩 된 시퀀스의 내부 표현은 ArrayBuffer [Row]입니다. Seq [Row] 대신 수퍼 유형을 사용할 수 있습니다. 1은 외부 행의 열 색인입니다. 여기 getAs 메소드를 사용했지만 Spark의 최신 버전에는 대안이 있습니다. Row 특성의 소스 코드를 참조하십시오.
이제 RDD [Outer]를 사용하면 원하는 변형이나 동작을 적용 할 수 있습니다.
// Filter the outers outers.filter(_.inners.nonEmpty) // Filter the inners outers.map(outer => outer.copy(inners = outer.inners.filter(_.a == "a")))
우리는 parquet 파일을 읽기 위해서만 spark-SQL 라이브러리를 사용했다. 예를 들어 RDD에 매핑하기 전에 DataFrame에서 직접 원하는 열만 선택할 수 있습니다.
dataFrame.select('col1, 'col2).map { row => ... }
-
==============================
2.필자가 사용하고있는 Python 기반 답변을 제공 할 것이다. 나는 스칼라가 비슷한 것을 가지고 있다고 생각한다.
필자가 사용하고있는 Python 기반 답변을 제공 할 것이다. 나는 스칼라가 비슷한 것을 가지고 있다고 생각한다.
explode 함수는 Python API 문서에 따라 DataFrames의 중첩 배열을 처리하기 위해 Spark 1.4.0에 추가되었습니다.
테스트 데이터 프레임 생성 :
from pyspark.sql import Row df = sqlContext.createDataFrame([Row(a=1, intlist=[1,2,3]), Row(a=2, intlist=[4,5,6])]) df.show() ## +-+--------------------+ ## |a| intlist| ## +-+--------------------+ ## |1|ArrayBuffer(1, 2, 3)| ## |2|ArrayBuffer(4, 5, 6)| ## +-+--------------------+
폭발을 사용하여 목록 열을 평평하게 만듭니다.
from pyspark.sql.functions import explode df.select(df.a, explode(df.intlist)).show() ## +-+---+ ## |a|_c0| ## +-+---+ ## |1| 1| ## |1| 2| ## |1| 3| ## |2| 4| ## |2| 5| ## |2| 6| ## +-+---+
-
==============================
3.또 다른 접근법은 다음과 같은 패턴 일치를 사용하는 것입니다.
또 다른 접근법은 다음과 같은 패턴 일치를 사용하는 것입니다.
val rdd: RDD[(String, List[(String, String)]] = dataFrame.map(_.toSeq.toList match { case List(key: String, inners: Seq[Row]) => key -> inners.map(_.toSeq.toList match { case List(a:String, b: String) => (a, b) }).toList })
행에서 직접 패턴 일치를 패턴화할 수 있지만 몇 가지 이유로 실패 할 수 있습니다.
-
==============================
4.위의 답변은 모든 중대한 답변이며 다른 측면에서이 질문에 대처합니다. Spark SQL은 중첩 된 데이터에 액세스하는 데에도 매우 유용합니다.
위의 답변은 모든 중대한 답변이며 다른 측면에서이 질문에 대처합니다. Spark SQL은 중첩 된 데이터에 액세스하는 데에도 매우 유용합니다.
다음은 SQL에서 직접 explode ()를 사용하여 중첩 된 컬렉션을 쿼리하는 방법을 보여주는 예제입니다.
SELECT hholdid, tsp.person_seq_no FROM ( SELECT hholdid, explode(tsp_ids) as tsp FROM disc_mrt.unified_fact uf )
tsp_ids는 위의 외부 쿼리에서 선택하는 person_seq_no를 비롯한 많은 특성을 가진 구조체의 중첩입니다.
위의 내용은 Spark 2.0에서 테스트되었습니다. 나는 작은 테스트를했는데 Spark 1.6에서는 작동하지 않습니다. 이 질문은 Spark 2가 주변에 없었을 때 묻기 때문에이 답변은 중첩 된 구조를 다루는 데 사용할 수있는 옵션 목록에 멋지게 추가됩니다.
SQL 액세스를 위해 explode ()에서 JIRA를 해결하지 못했습니다 :
from https://stackoverflow.com/questions/30008127/how-to-read-a-nested-collection-in-spark by cc-by-sa and MIT license
'HADOOP' 카테고리의 다른 글
[HADOOP] Spark 2.0은 'DirectParquetOutputCommitter'를 사용하지 않습니다. 어떻게 사용하지 않고 살 수 있습니까? (0) | 2019.07.06 |
---|---|
[HADOOP] 하위 쿼리를 작성하고 하이브에서 "In"절을 사용하는 방법 (0) | 2019.07.06 |
[HADOOP] Hadoop MapReduce의 Mapper / Reducer 설정 및 정리 방법 (0) | 2019.07.05 |
[HADOOP] 타임 스탬프에 따라 HDFS 디렉토리의 파일을 나열하는 hdfs 명령이 있습니까? (0) | 2019.07.05 |
[HADOOP] Hadoop 배포판의 차이점 (0) | 2019.07.05 |