복붙노트

[HADOOP] 집계 후 스파크가있는 하이브 테이블 읽기 및 쓰기

HADOOP

집계 후 스파크가있는 하이브 테이블 읽기 및 쓰기

우리는 하이브 창고가 있고 다양한 작업 (주로 분류)을 위해 스파크를 사용하고 싶었습니다. 때때로 하이브 테이블로 결과를 작성하십시오. 예를 들어, 다음 python 함수를 작성하여 original_table 열 2의 총 합계를 original_table 열 1로 그룹화했습니다. 이 함수는 작동하지만 비효율적인데, 특히 키 - 값 쌍으로 변환 할지도와 사전 버전이 걱정됩니다. 함수 결합 자, mergeValue, mergeCombiner는 다른 곳에 정의되어 있지만 잘 작동합니다.

from pyspark import HiveContext

rdd = HiveContext(sc).sql('from original_table select *')

#convert to key-value pairs
key_value_rdd = rdd.map(lambda x: (x[0], int(x[1])))

#create rdd where rows are (key, (sum, count)
combined = key_value_rdd.combineByKey(combiner, mergeValue, mergeCombiner)

# creates rdd with dictionary values in order to create schemardd
dict_rdd = combined.map(lambda x: {'k1': x[0], 'v1': x[1][0], 'v2': x[1][1]})

# infer the schema
schema_rdd = HiveContext(sc).inferSchema(dict_rdd)

# save
schema_rdd.saveAsTable('new_table_name')

같은 일을하는보다 효율적인 방법이 있습니까?

해결법

  1. ==============================

    1.... 질문이 작성되었을 때 이것이 가능하지 않았지만 createDataFrame () 호출을 사용하는 것이 현재 (1.3 이후) 이해가되지 않습니까?

    ... 질문이 작성되었을 때 이것이 가능하지 않았지만 createDataFrame () 호출을 사용하는 것이 현재 (1.3 이후) 이해가되지 않습니까?

    첫 번째 RDD를 얻은 후 호출 할 수있는 것처럼 보입니다. 그런 다음 구조에 대해 간단한 SQL 문을 실행하여 전체 작업을 한 번에 처리 할 수 ​​있습니다. (합계 및 그룹화) DataFrame 구조는 API 문서를 올바르게 읽으면 작성시 스키마를 직접 추론 할 수 있습니다.

    (http://spark.apache.org/docs/1.3.1/api/python/pyspark.sql.html#pyspark.sql.HiveContext)

  2. ==============================

    2.이 오류는 hive.exec.scratchdir을 사용자가 액세스 할 수있는 폴더로 설정하여 해결할 수 있습니다.

    이 오류는 hive.exec.scratchdir을 사용자가 액세스 할 수있는 폴더로 설정하여 해결할 수 있습니다.

  3. ==============================

    3.어떤 버전의 스파크를 사용하고 있습니까?

    어떤 버전의 스파크를 사용하고 있습니까?

    이 대답은 1.6 & 데이터 프레임을 사용하여 기반으로합니다.

    val sc = new SparkContext(conf)
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    
    import sqlContext.implicits._
    val client = Seq((1, "A", 10), (2, "A", 5), (3, "B", 56)).toDF("ID", "Categ", "Amnt")
    
        import org.apache.spark.sql.functions._
        client.groupBy("Categ").agg(sum("Amnt").as("Sum"), count("ID").as("count")).show()
    
    
    +-----+---+-----+
    |Categ|Sum|count|
    +-----+---+-----+
    |    A| 15|    2|
    |    B| 56|    1|
    +-----+---+-----+
    

    희망이 도움이 !!

  4. from https://stackoverflow.com/questions/28412954/reading-and-writing-from-hive-tables-with-spark-after-aggregation by cc-by-sa and MIT license