복붙노트

[HADOOP] 스파이크 데이터 프레임을 하이브에 동적 파티션 테이블로 저장

HADOOP

스파이크 데이터 프레임을 하이브에 동적 파티션 테이블로 저장

csv 파일에서 데이터 프레임으로 읽는 샘플 응용 프로그램이 있습니다. 데이터 프레임은 방법을 사용하여 쪽모락 형식으로 하이브 테이블에 저장할 수 있습니다. df.saveAsTable (tablename, mode).

위의 코드는 정상적으로 작동하지만 매일 테이블에있는 creationdate (테이블의 열)를 기반으로 하이브 테이블을 동적으로 파티션하려는 많은 데이터가 있습니다.

데이터 프레임을 동적으로 파티션하고 하이브 창고에 저장할 수있는 방법이 있습니까? hivesqlcontext.sql을 사용하여 insert 문을 하드 코딩하는 것을 삼가고 싶습니다 (테이블 partittioin을 (date) ....로 삽입하십시오).

질문은 다음과 같은 확장 기능으로 간주 될 수 있습니다. DataFrame을 Hive에 직접 저장하는 방법?

어떤 도움을 많이 주시면 감사하겠습니다.

해결법

  1. ==============================

    1.나는 그것이 다음과 같이 작동한다고 믿는다.

    나는 그것이 다음과 같이 작동한다고 믿는다.

    df는 년, 월 및 기타 열이있는 데이터 프레임입니다.

    df.write.partitionBy('year', 'month').saveAsTable(...)
    

    또는

    df.write.partitionBy('year', 'month').insertInto(...)
    
  2. ==============================

    2.df.write (). mode (SaveMode.Append) .partitionBy ( "colname"). saveAsTable ( "Table")을 사용하여 파티션 된 하이브 테이블에 쓸 수있었습니다.

    df.write (). mode (SaveMode.Append) .partitionBy ( "colname"). saveAsTable ( "Table")을 사용하여 파티션 된 하이브 테이블에 쓸 수있었습니다.

    나는 다음과 같은 속성을 사용 가능하게 만들어야 만했습니다.

    hiveContext.setConf("hive.exec.dynamic.partition", "true")
    hiveContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
    
  3. ==============================

    3.나는 똑같은 문제에 직면했지만 다음 트릭을 사용하여 해결했습니다.

    나는 똑같은 문제에 직면했지만 다음 트릭을 사용하여 해결했습니다.

  4. ==============================

    4.이것은 나를 위해 일하는 것입니다. 필자는 이러한 설정을 지정한 다음 데이터를 분할 된 테이블에 저장합니다.

    이것은 나를 위해 일하는 것입니다. 필자는 이러한 설정을 지정한 다음 데이터를 분할 된 테이블에 저장합니다.

    from pyspark.sql import HiveContext
    sqlContext = HiveContext(sc)
    sqlContext.setConf("hive.exec.dynamic.partition", "true")
    sqlContext.setConf("hive.exec.dynamic.partition.mode", 
    "nonstrict")
    
  5. ==============================

    5.이것은 파이썬과 스파크 2.1.0을 사용하여 나를 위해 일했다.

    이것은 파이썬과 스파크 2.1.0을 사용하여 나를 위해 일했다.

    이 작업을 수행하는 가장 좋은 방법인지 확실하지 않지만 작동하는 ...

    # WRITE DATA INTO A HIVE TABLE
    import pyspark
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
        .builder \
        .master("local[*]") \
        .config("hive.exec.dynamic.partition", "true") \
        .config("hive.exec.dynamic.partition.mode", "nonstrict") \
        .enableHiveSupport() \
        .getOrCreate()
    
    ### CREATE HIVE TABLE (with one row)
    spark.sql("""
    CREATE TABLE IF NOT EXISTS hive_df (col1 INT, col2 STRING, partition_bin INT)
    USING HIVE OPTIONS(fileFormat 'PARQUET')
    PARTITIONED BY (partition_bin)
    LOCATION 'hive_df'
    """)
    spark.sql("""
    INSERT INTO hive_df PARTITION (partition_bin = 0)
    VALUES (0, 'init_record')
    """)
    ###
    
    ### CREATE NON HIVE TABLE (with one row)
    spark.sql("""
    CREATE TABLE IF NOT EXISTS non_hive_df (col1 INT, col2 STRING, partition_bin INT)
    USING PARQUET
    PARTITIONED BY (partition_bin)
    LOCATION 'non_hive_df'
    """)
    spark.sql("""
    INSERT INTO non_hive_df PARTITION (partition_bin = 0)
    VALUES (0, 'init_record')
    """)
    ###
    
    ### ATTEMPT DYNAMIC OVERWRITE WITH EACH TABLE
    spark.sql("""
    INSERT OVERWRITE TABLE hive_df PARTITION (partition_bin)
    VALUES (0, 'new_record', 1)
    """)
    spark.sql("""
    INSERT OVERWRITE TABLE non_hive_df PARTITION (partition_bin)
    VALUES (0, 'new_record', 1)
    """)
    
    spark.sql("SELECT * FROM hive_df").show() # 2 row dynamic overwrite
    spark.sql("SELECT * FROM non_hive_df").show() # 1 row full table overwrite
    
  6. from https://stackoverflow.com/questions/31341498/save-spark-dataframe-as-dynamic-partitioned-table-in-hive by cc-by-sa and MIT license