복붙노트

[HADOOP] 천천히 변화하는 차원 - 하이브의 SCD1 및 SCD2 구현

HADOOP

천천히 변화하는 차원 - 하이브의 SCD1 및 SCD2 구현

Hive (1.2.1)에서 SCD1 및 SCD2 구현을 찾고 있습니다. 하이브 (0.14) 이전에 SCD1 및 SCD2 테이블을로드하는 해결 방법을 알고 있습니다. 해결 방법으로 SCD1 및 SCD2를로드하는 링크는 다음과 같습니다. http://hortonworks.com/blog/four-step-strategy-incremental-updates-hive/

이제 Hive는 ACID 작업을 지원하므로 더 좋거나 직접적인 방법으로로드 할 수 있습니다.

해결법

  1. ==============================

    1.HDFS는 변경 불가능한 저장소이기 때문에 버전을로드하기 위해 버전 관리 데이터 및 기록 유지 (SCD2)가 기본 동작이어야한다고 주장 할 수 있습니다. 윈도우 기능을 사용하여 현재 상태 / 최신 값을 검색하는 Hadoop SQL 쿼리 엔진 (하이브, 임팔라, 드릴 등)에서 뷰를 생성 할 수 있습니다. 내 블로그 게시물에서 Hadoop의 차원 모델에 대해 자세히 알아볼 수 있습니다 (예 : 큰 차원과 사실 테이블을 처리하는 방법.

    HDFS는 변경 불가능한 저장소이기 때문에 버전을로드하기 위해 버전 관리 데이터 및 기록 유지 (SCD2)가 기본 동작이어야한다고 주장 할 수 있습니다. 윈도우 기능을 사용하여 현재 상태 / 최신 값을 검색하는 Hadoop SQL 쿼리 엔진 (하이브, 임팔라, 드릴 등)에서 뷰를 생성 할 수 있습니다. 내 블로그 게시물에서 Hadoop의 차원 모델에 대해 자세히 알아볼 수 있습니다 (예 : 큰 차원과 사실 테이블을 처리하는 방법.

  2. ==============================

    2.글쎄, 나는 두 임시 테이블을 사용하여 주위에 그것을 작동 :

    글쎄, 나는 두 임시 테이블을 사용하여 주위에 그것을 작동 :

        drop table if exists administrator_tmp1;
    drop table if exists administrator_tmp2;
    
    set hive.exec.dynamic.partition=true;
    set hive.exec.dynamic.partition.mode=nonstrict;
    
    --review_administrator
    CREATE TABLE if not exists review_administrator(
        admin_id bigint ,
        admin_name string,
        create_time string,
        email string ,
        password string,
        status_description string,
        token string ,
        expire_time string ,
        granter_user_id bigint ,
        admin_time string ,
        effect_start_date string ,
        effect_end_date string 
    )
    partitioned by (current_row_indicator string comment 'current, expired')
    stored as parquet;
    
    --tmp1 is used for saving origin data
    CREATE TABLE if not exists administrator_tmp1(
        admin_id bigint ,
        admin_name string,
        create_time string,
        email string ,
        password string ,
        status_description string ,
        token string ,
        expire_time string ,
        granter_user_id bigint ,
        admin_time string ,
        effect_start_date string ,
        effect_end_date string 
    )
    partitioned by (current_row_indicator string comment 'current, expired:')
    stored as parquet;
    
    --tmp2 saving the scd data
    CREATE TABLE if not exists administrator_tmp2(
        admin_id bigint ,
        admin_name string,
        create_time string,
        email string ,
        password string ,
        status_description string ,
        token string ,
        expire_time string ,
        granter_user_id bigint ,
        admin_time string ,
        effect_start_date string ,
        effect_end_date string 
    )
    partitioned by (current_row_indicator string comment 'current, expired')
    stored as parquet;
    
    --insert origin data into tmp1
    INSERT OVERWRITE TABLE administrator_tmp1 PARTITION(current_row_indicator)
    SELECT 
        user_id as admin_id,
        name as admin_name,
        time as create_time,
        email as email,
        password as password,
        status as status_description,
        token as token,
        expire_time as expire_time,
        admin_id as granter_user_id,
        admin_time as admin_time,
        '{{ ds }}' as effect_start_date,
        '9999-12-31' as effect_end_date,
        'current' as current_row_indicator
    FROM 
        ks_db_origin.gifshow_administrator_origin
    ;
    
    --insert scd data into tmp2
    --for the data unchanged
    INSERT INTO TABLE administrator_tmp2 PARTITION(current_row_indicator)
    SELECT
        t2.admin_id,
        t2.admin_name,
        t2.create_time,
        t2.email,
        t2.password,
        t2.status_description,
        t2.token,
        t2.expire_time,
        t2.granter_user_id,
        t2.admin_time,
        t2.effect_start_date,
        t2.effect_end_date as effect_end_date,
        t2.current_row_indicator
    FROM
        administrator_tmp1 t1
    INNER JOIN 
        (
            SELECT * FROM review_administrator 
            WHERE current_row_indicator = 'current'
        ) t2
    ON 
        t1.admin_id = t2.admin_id
    AND t1.admin_name = t2.admin_name
    AND t1.create_time = t2.create_time
    AND t1.email = t2.email
    AND t1.password = t2.password
    AND t1.status_description = t2.status_description
    AND t1.token = t2.token
    AND t1.expire_time = t2.expire_time
    AND t1.granter_user_id = t2.granter_user_id
    AND t1.admin_time = t2.admin_time
    ;
    
    --for the data changed , update the effect_end_date
    INSERT INTO TABLE administrator_tmp2 PARTITION(current_row_indicator)
    SELECT
        t2.admin_id,
        t2.admin_name,
        t2.create_time,
        t2.email,
        t2.password,
        t2.status_description,
        t2.token,
        t2.expire_time,
        t2.granter_user_id,
        t2.admin_time,
        t2.effect_start_date as effect_start_date,
        '{{ yesterday_ds }}' as effect_end_date,
        'expired' as current_row_indicator
    FROM
        administrator_tmp1 t1
    INNER JOIN 
        (
            SELECT * FROM review_administrator 
            WHERE current_row_indicator = 'current'
        ) t2
    ON 
        t1.admin_id = t2.admin_id
    WHERE NOT 
        (
            t1.admin_name = t2.admin_name
        AND t1.create_time = t2.create_time
        AND t1.email = t2.email
        AND t1.password = t2.password
        AND t1.status_description = t2.status_description
        AND t1.token = t2.token
        AND t1.expire_time = t2.expire_time
        AND t1.granter_user_id = t2.granter_user_id
        AND t1.admin_time = t2.admin_time
        )
    ;
    
    --for the changed data and the new data
    INSERT INTO TABLE administrator_tmp2 PARTITION(current_row_indicator)
    SELECT
        t1.admin_id,
        t1.admin_name,
        t1.create_time,
        t1.email,
        t1.password,
        t1.status_description,
        t1.token,
        t1.expire_time,
        t1.granter_user_id,
        t1.admin_time,
        t1.effect_start_date,
        t1.effect_end_date,
        t1.current_row_indicator
    FROM
        administrator_tmp1 t1
    LEFT OUTER JOIN 
        (
            SELECT * FROM review_administrator 
            WHERE current_row_indicator = 'current'
        ) t2
    ON 
        t1.admin_id = t2.admin_id
    AND t1.admin_name = t2.admin_name
    AND t1.create_time = t2.create_time
    AND t1.email = t2.email
    AND t1.password = t2.password
    AND t1.status_description = t2.status_description
    AND t1.token = t2.token
    AND t1.expire_time = t2.expire_time
    AND t1.granter_user_id = t2.granter_user_id
    AND t1.admin_time = t2.admin_time
    WHERE t2.admin_id IS NULL
    ;
    
    --for the data already marked by 'expired'
    INSERT INTO TABLE administrator_tmp2 PARTITION(current_row_indicator)
    SELECT
        t1.admin_id,
        t1.admin_name,
        t1.create_time,
        t1.email,
        t1.password,
        t1.status_description,
        t1.token,
        t1.expire_time,
        t1.granter_user_id,
        t1.admin_time,
        t1.effect_start_date,
        t1.effect_end_date,
        t1.current_row_indicator
    FROM
        review_administrator t1
    WHERE t1.current_row_indicator = 'expired'
    ;
    
    --populate the dim table
    INSERT OVERWRITE TABLE review_administrator PARTITION(current_row_indicator)
    SELECT
        t1.admin_id,
        t1.admin_name,
        t1.create_time,
        t1.email,
        t1.password,
        t1.status_description,
        t1.token,
        t1.expire_time,
        t1.granter_user_id,
        t1.admin_time,
        t1.effect_start_date,
        t1.effect_end_date,
        t1.current_row_indicator
    FROM
        administrator_tmp2 t1
    ;
    
    --drop the two temp table
    drop table administrator_tmp1;
    drop table administrator_tmp2;
    
    
    -- --example data
    -- --2017-01-01
    -- insert into table review_administrator PARTITION(current_row_indicator)
    -- SELECT '1','a','2016-12-31','a@ks.com','password','open','token1','2017-12-31',
    -- 0,'2017-12-31','2017-01-01','9999-12-31','current' 
    -- FROM default.sample_07 limit 1;
    
    -- --2017-01-02
    -- insert into table administrator_tmp1 PARTITION(current_row_indicator)
    -- SELECT '1','a','2016-12-31','a01@ks.com','password','open','token1','2017-12-31',
    -- 0,'2017-12-31','2017-01-02','9999-12-31','current' 
    -- FROM default.sample_07 limit 1;
    
    -- insert into table administrator_tmp1 PARTITION(current_row_indicator)
    -- SELECT '2','b','2016-12-31','a@ks.com','password','open','token1','2017-12-31',
    -- 0,'2017-12-31','2017-01-02','9999-12-31','current' 
    -- FROM default.sample_07 limit 1;
    
    -- --2017-01-03
    -- --id 1 is changed
    -- insert into table administrator_tmp1 PARTITION(current_row_indicator)
    -- SELECT '1','a','2016-12-31','a03@ks.com','password','open','token1','2017-12-31',
    -- 0,'2017-12-31','2017-01-03','9999-12-31','current' 
    -- FROM default.sample_07 limit 1;
    -- --id 2 is not changed at all
    -- insert into table administrator_tmp1 PARTITION(current_row_indicator)
    -- SELECT '2','b','2016-12-31','a@ks.com','password','open','token1','2017-12-31',
    -- 0,'2017-12-31','2017-01-03','9999-12-31','current' 
    -- FROM default.sample_07 limit 1;
    -- --id 3 is a new record
    -- insert into table administrator_tmp1 PARTITION(current_row_indicator)
    -- SELECT '3','c','2016-12-31','c@ks.com','password','open','token1','2017-12-31',
    -- 0,'2017-12-31','2017-01-03','9999-12-31','current' 
    -- FROM default.sample_07 limit 1;
    
    -- --now dim table will show you the right SCD.
    
  3. ==============================

    3.배타적 조인 방식을 사용하여 Hive에서 천천히 변화하는 차원 유형 2에 대한 자세한 구현 방법은 다음과 같습니다.

    배타적 조인 방식을 사용하여 Hive에서 천천히 변화하는 차원 유형 2에 대한 자세한 구현 방법은 다음과 같습니다.

    소스가 완전한 데이터 파일, 예전의 업데이트 된 레코드 및 새 레코드를 전송한다고 가정합니다.

    Steps-
    

    SCD 유형 2의보다 자세한 구현은 여기에서 찾을 수 있습니다.

    https://github.com/sahilbhange/slowly-changing-dimension

  4. ==============================

    4.SCD를 사용하여 데이터를 관리 할 때 다른 접근법을 사용했습니다.

    SCD를 사용하여 데이터를 관리 할 때 다른 접근법을 사용했습니다.

    이제 마법은 관련된 모든 열의 체크섬을 계산하지만 제어 열은 각 행에 대해 고유 한 지문을 만들어 계산됩니다. 지문 인쇄 (체크섬) 열은 가장 최근 세대와 비교하여 변경된 열이 있는지 확인하는 데 사용됩니다 (가장 최근 세대는 키, loaded_on 및 시퀀스를 기반으로 한 최신 데이터 상태를 기반으로합니다).

    이전 세대가 없거나 일일 업데이트에서 오는 행에 기록 파일이나 테이블 내에 새로운 행 (새로운 생성)을 생성해야 할 경우 그리고 현재 일일 업데이트에서 오는 행에는 변경 사항이 없으므로 이전 세대와 비교하여 차이가 없기 때문에 행을 만들 필요가 없습니다.

    필요한 논리 유형은 Apache Spark을 사용하여 빌드 할 수 있습니다. 단일 문에서 Spark에 데이터 유형의 여러 열을 연결하고 핑거 프린트에 사용되는 해시 값을 계산하도록 요청할 수 있습니다.

    모두 함께 지금 당신은 모든 데이터 소스를 받아 들일 것입니다 스파크를 기반으로 유틸리티를 개발할 수 잘 정리, 깨끗하고 느린 차원 인식 기록 파일, 테이블, ... 마지막으로, 오직 추가 업데이 트를!

  5. from https://stackoverflow.com/questions/37472146/slowly-changing-dimensions-scd1-and-scd2-implementation-in-hive by cc-by-sa and MIT license