복붙노트

[HADOOP] 하이브 조인 최적화

HADOOP

하이브 조인 최적화

하이브에서 처리하고 S3에 출력을 다시 저장해야하는 S3 버킷에 두 세트의 데이터가 저장되어 있습니다. 각 데이터 세트의 샘플 행은 다음과 같습니다.

DataSet 1: {"requestId":"TADS6152JHGJH5435", "customerId":"ASJHAGSJH","sessionId":"172356126"}

DataSet2: {"requestId":"TADS6152JHGJH5435","userAgent":"Mozilla"}

requestId를 기반으로이 두 데이터 세트를 결합하고 결합 된 행을 다음과 같이 출력해야합니다.

Output:  {"requestId":"TADS6152JHGJH5435", "customerId":"ASJHAGSJH","sessionId":"172356126","userAgent":"Mozilla"}

데이터 세트 1의 requestIds는 데이터 세트 2의 requestid 중 적절한 하위 세트입니다. LEFT OUTER JOIN을 사용하여 출력을 얻습니다. 다음은 하이브 스크립트의 단순화 된 버전입니다.

CREATE EXTERNAL TABLE dataset1 (
     requestId string,
     customerId string,
     sessionId string
 )
LOCATION 's3://path_to_dataset1/';

CREATE EXTERNAL TABLE dataset2 (
     requestId string,
     userAgent string
 )
LOCATION 's3://path_to_dataset2/';

CREATE EXTERNAL TABLE output (
     requestId string,
     customerId string,
     sessionId string,
     userAgent string
 )
LOCATION 's3://path_to_output/';

INSERT OVERWRITE TABLE output
  SELECT d1.requestId, d1.customerId, d1.sessionId, d2.userAgent
  FROM dataset1 d1 LEFT OUTER JOIN dataset2 d2
  ON (d1.requestId=d2.requestId);

내 질문은 :

이 조인을 최적화 할 수있는 기회가 있습니까? 테이블의 파티셔닝 / 버킷을 사용하여 조인을 더 빠르게 실행할 수 있습니까? 내 스크립트에서 hive.auto.convert.join을 true로 설정했습니다. 위 쿼리에 대해 더 나은 성능을 얻기 위해 설정해야하는 다른 하이브 속성은 무엇입니까?

해결법

  1. ==============================

    1.

    1. Optimize Joins
    

    자동 변환 맵 조인을 사용하고 왜곡 조인을 최적화하여 조인의 성능을 향상시킬 수 있습니다.

    Auto Map Joins
    

    자동 매핑 - 조인은 큰 표를 작은 표로 결합 할 때 매우 유용한 기능입니다. 이 기능을 사용하면 작은 테이블이 각 노드의 로컬 캐시에 저장되고 Map 단계의 큰 테이블과 조인됩니다. 자동 맵 결합을 사용하면 두 가지 이점이 있습니다. 먼저 작은 테이블을 캐시에로드하면 각 데이터 노드에서 읽기 시간이 절약됩니다. 두 번째로 조인 작업이 각 데이터 블록에 대해 Map 단계에서 이미 완료되었으므로 Hive 쿼리에서 왜곡 조인을 방지합니다.

    Skew Joins
    

    하이브 셸의 SET 명령이나 hive-site.xml 파일을 통해 hive.optimize.skewjoin 속성을 true로 설정하여 불균형 조인, 즉 불균형 조인을 최적화 할 수 있습니다.

      <property>
        <name>hive.optimize.skewjoin</name>
        <value>true</value>
        <description>
          Whether to enable skew join optimization. 
          The algorithm is as follows: At runtime, detect the keys with a large skew. Instead of
          processing those keys, store them temporarily in an HDFS directory. In a follow-up map-reduce
          job, process those skewed keys. The same key need not be skewed for all the tables, and so,
          the follow-up map-reduce job (for the skewed keys) would be much faster, since it would be a
          map-join.
        </description>
      </property>
      <property>
        <name>hive.skewjoin.key</name>
        <value>100000</value>
        <description>
          Determine if we get a skew key in join. If we see more than the specified number of rows with the same key in join operator,
          we think the key as a skew join key. 
        </description>
      </property>
      <property>
        <name>hive.skewjoin.mapjoin.map.tasks</name>
        <value>10000</value>
        <description>
          Determine the number of map task used in the follow up map join job for a skew join.
          It should be used together with hive.skewjoin.mapjoin.min.split to perform a fine grained control.
        </description>
      </property>
      <property>
        <name>hive.skewjoin.mapjoin.min.split</name>
        <value>33554432</value>
        <description>
          Determine the number of map task at most used in the follow up map join job for a skew join by specifying 
          the minimum split size. It should be used together with hive.skewjoin.mapjoin.map.tasks to perform a fine grained control.
        </description>
      </property>
    
    2. Enable Bucketed Map Joins
    

    테이블이 특정 열에 의해 버킷되고 이러한 테이블이 조인에 사용되면 버킷으로 맵핑 된 조인을 사용하여 성능을 향상시킬 수 있습니다.

      <property>
        <name>hive.optimize.bucketmapjoin</name>
        <value>true</value>
        <description>Whether to try bucket mapjoin</description>
      </property>
      <property>
        <name>hive.optimize.bucketmapjoin.sortedmerge</name>
        <value>true</value>
        <description>Whether to try sorted bucket merge map join</description>
      </property>
    

    .

    3. Enable Tez Execution Engine
    

    오래된 Map-reduce 엔진에서 Hive 쿼리를 실행하는 대신 Tez 실행 엔진에서 실행하여 최소 100 %에서 300 %까지 하이브 쿼리의 성능을 향상시킬 수 있습니다. 하이브 셸의 속성을 사용하여 Tez 엔진을 사용할 수 있습니다.

    hive> set hive.execution.engine=tez;
    

    .

    4. Enable Parallel Execution
    

    Hive는 쿼리를 하나 이상의 단계로 변환합니다. 스테이지는 MapReduce 스테이지, 샘플링 스테이지, 병합 스테이지, 제한 스테이지 일 수 있습니다. 기본적으로 Hive는이 단계를 한 번에 하나씩 실행합니다. 특정 작업은 서로에 의존하지 않고 실행될 수있는 몇 가지 단계로 구성 될 수 있습니다.

    병렬 작업을 통해 전체 작업을보다 신속하게 완료 할 수 있습니다. 병렬 실행은 아래 속성을 설정하여 활성화 할 수 있습니다.

      <property>
        <name>hive.exec.parallel</name>
        <value>true</value>
        <description>Whether to execute jobs in parallel</description>
      </property>
      <property>
        <name>hive.exec.parallel.thread.number</name>
        <value>8</value>
        <description>How many jobs at most can be executed in parallel</description>
      </property>
    

    .

    5. Enable Vectorization
    

    벡터화 기능은 하이브 -0.13.1 릴리스에서만 하이브에 처음 도입되었습니다. 벡터화 된 쿼리 실행을 통해 매번 단일 행 대신 1024 행의 일괄 처리를 한 번에 수행함으로써 스캔, 집계, 필터 및 조인과 같은 작업의 성능을 향상시킬 수 있습니다.

    하이브 셸 또는 hive-site.xml 파일에서 세 가지 속성을 설정하여 벡터화 된 쿼리를 실행할 수 있습니다.

    hive> set hive.vectorized.execution.enabled = true;
    hive> set hive.vectorized.execution.reduce.enabled = true;
    hive> set hive.vectorized.execution.reduce.groupby.enabled = true;
    

    .

    6. Enable Cost Based Optimization
    

    최근 Hive 릴리스는 비용 기반 최적화의 기능을 제공 했으므로 쿼리 비용에 따라 추가 최적화를 수행 할 수 있으므로 조인 순서를 지정하는 방법, 수행 할 조인 유형, 병렬 정도 등이 다를 수 있습니다.

    비용 기반 최적화는 hive-site.xml 파일의 아래 속성을 설정하여 활성화 할 수 있습니다.

      <property>
        <name>hive.cbo.enable</name>
        <value>true</value>
        <description>Flag to control enabling Cost Based Optimizations using Calcite framework.</description>
      </property>
      <property>
        <name>hive.compute.query.using.stats</name>
        <value>true</value>
        <description>
          When set to true Hive will answer a few queries like count(1) purely using stats
          stored in metastore. For basic stats collection turn on the config hive.stats.autogather to true.
          For more advanced stats collection need to run analyze table queries.
        </description>
      </property>
      <property>
        <name>hive.stats.fetch.partition.stats</name>
        <value>true</value>
        <description>
          Annotation of operator tree with statistics information requires partition level basic
          statistics like number of rows, data size and file size. Partition statistics are fetched from
          metastore. Fetching partition statistics for each needed partition can be expensive when the
          number of partitions is high. This flag can be used to disable fetching of partition statistics
          from metastore. When this flag is disabled, Hive will make calls to filesystem to get file sizes
          and will estimate the number of rows from row schema.
        </description>
      </property>
      <property>
        <name>hive.stats.fetch.column.stats</name>
        <value>true</value>
        <description>
          Annotation of operator tree with statistics information requires column statistics.
          Column statistics are fetched from metastore. Fetching column statistics for each needed column
          can be expensive when the number of columns is high. This flag can be used to disable fetching
          of column statistics from metastore.
        </description>
      </property>
      <property>
        <name>hive.stats.autogather</name>
        <value>true</value>
        <description>A flag to gather statistics automatically during the INSERT OVERWRITE command.</description>
      </property>
      <property>
        <name>hive.stats.dbclass</name>
        <value>fs</value>
        <description>
          Expects one of the pattern in [jdbc(:.*), hbase, counter, custom, fs].
          The storage that stores temporary Hive statistics. In filesystem based statistics collection ('fs'), 
          each task writes statistics it has collected in a file on the filesystem, which will be aggregated 
          after the job has finished. Supported values are fs (filesystem), jdbc:database (where database 
          can be derby, mysql, etc.), hbase, counter, and custom as defined in StatsSetupConst.java.
        </description>
      </property>
    
  2. from https://stackoverflow.com/questions/32370033/hive-join-optimization by cc-by-sa and MIT license