복붙노트

[HADOOP] HBase를 MapReduce의 소스로 사용할 때 TableInputFormatBase를 확장하여 각 영역에 대해 여러 개의 스플릿과 여러 맵퍼를 만들 수 있습니까?

HADOOP

HBase를 MapReduce의 소스로 사용할 때 TableInputFormatBase를 확장하여 각 영역에 대해 여러 개의 스플릿과 여러 맵퍼를 만들 수 있습니까?

HBase를 MapReduce 작업 중 하나의 소스로 사용하려고합니다. 나는 TableInputFormat이 하나의 입력 분할 (따라서 하나의 매퍼)을 각 영역마다 지정한다는 것을 알고있다. 그러나 이것은 비효율적 인 것으로 보인다. 한 번에 주어진 지역에서 여러 매퍼를 작업하고 싶습니다. TableInputFormatBase를 확장하여이 작업을 수행 할 수 있습니까? 한 가지 예를 가르쳐 주시겠습니까? 게다가, 이것도 좋은 생각입니까?

도와 주셔서 감사합니다.

해결법

  1. ==============================

    1.InputFormat를 확장하는 사용자 정의 입력 형식이 필요합니다. 당신은 어떻게 많은 데이터 (범위 기반 쿼리)를 스캔 할 것인가, 데이터를 쓰는 동안 스캔이 더 빨라질 수있는 모든 최적화가 무엇인지에 대한 질문에서 대답으로 어떻게 할 것인지 생각할 수 있습니다. 데이터 처리 시간이 데이터 검색 시간보다 더 큰 경우 이는 좋은 아이디어입니다.

    InputFormat를 확장하는 사용자 정의 입력 형식이 필요합니다. 당신은 어떻게 많은 데이터 (범위 기반 쿼리)를 스캔 할 것인가, 데이터를 쓰는 동안 스캔이 더 빨라질 수있는 모든 최적화가 무엇인지에 대한 질문에서 대답으로 어떻게 할 것인지 생각할 수 있습니다. 데이터 처리 시간이 데이터 검색 시간보다 더 큰 경우 이는 좋은 아이디어입니다.

  2. ==============================

    2.주어진 영역에 대해 여러 맵퍼를 지정할 수 있는지는 확실하지 않지만 다음을 고려하십시오.

    주어진 영역에 대해 여러 맵퍼를 지정할 수 있는지는 확실하지 않지만 다음을 고려하십시오.

    한 매퍼가 지역마다 비효율적이라고 생각하면 (아마도 데이터 노드에 #cpus와 같은 리소스가 충분하지 않을 수도 있습니다) 파일 hbase-site.xml에 더 작은 영역 크기를 지정할 수 있습니다.

    다음을 변경하려는 경우 기본 configs 옵션 사이트가 있습니다. http://hbase.apache.org/configuration.html#hbase_default_configurations

    지역 크기를 작게하면 DFS의 파일 수를 늘릴 수 있으며 namenode의 메모리에 따라 hadoop DFS의 용량을 제한 할 수 있습니다. namenode의 메모리 사용은 DFS의 파일 수와 직접적으로 관련되어 있음을 기억하십시오. 클러스터가 어떻게 사용되고 있는지 모르기 때문에 상황에 따라 다를 수도 있고 그렇지 않을 수도 있습니다. 이 질문에 대한 은색 탄환의 대답은 결코 없습니다!

  3. ==============================

    3.1 . 키 집합이 매퍼간에 상호 배타적인지 확인하는 것은 절대적으로 좋습니다.

    1 . 키 집합이 매퍼간에 상호 배타적인지 확인하는 것은 절대적으로 좋습니다.

  4. ==============================

    4.이 MultipleScanTableInputFormat을 사용하면 MultipleScanTableInputFormat.PARTITIONS_PER_REGION_SERVER 구성을 사용하여 단일 regionserver에 대해 실행해야하는 매퍼 수를 제어 할 수 있습니다. 클래스는 모든 입력 스플릿을 해당 위치 (regionserver)별로 그룹화하며 RecordReader는 매퍼에 대한 모든 집계 된 스플릿을 적절히 반복합니다.

    이 MultipleScanTableInputFormat을 사용하면 MultipleScanTableInputFormat.PARTITIONS_PER_REGION_SERVER 구성을 사용하여 단일 regionserver에 대해 실행해야하는 매퍼 수를 제어 할 수 있습니다. 클래스는 모든 입력 스플릿을 해당 위치 (regionserver)별로 그룹화하며 RecordReader는 매퍼에 대한 모든 집계 된 스플릿을 적절히 반복합니다.

    여기에 예제가있다.

    https://gist.github.com/bbeaudreault/9788499#file-multiplescantableinputformat-java-L90

    그 작업은 단일 매퍼에 대해 여러 개의 집계 된 분할을 만들었습니다.

    private List<InputSplit> getAggregatedSplits(JobContext context) throws IOException {
    final List<InputSplit> aggregatedSplits = new ArrayList<InputSplit>();
    
    final Scan scan = getScan();
    
    for (int i = 0; i < startRows.size(); i++) {
      scan.setStartRow(startRows.get(i));
      scan.setStopRow(stopRows.get(i));
    
      setScan(scan);
    
      aggregatedSplits.addAll(super.getSplits(context));
    }
    
    // set the state back to where it was.. 
    scan.setStopRow(null);
    scan.setStartRow(null);
    
    setScan(scan);
    
    return aggregatedSplits;
     }
    

    지역 서버로 파티션 만들기

     @Override
     public List<InputSplit> getSplits(JobContext context) throws IOException {
    List<InputSplit> source = getAggregatedSplits(context);
    
    if (!partitionByRegionServer) {
      return source;
    }
    
    // Partition by regionserver
    Multimap<String, TableSplit> partitioned = ArrayListMultimap.<String, TableSplit>create();
    for (InputSplit split : source) {
      TableSplit cast = (TableSplit) split;
      String rs = cast.getRegionLocation();
    
      partitioned.put(rs, cast);
    }
    
  5. ==============================

    5.이것은 몇 개의 레코드 만 찾은 조건 스캔을 사용하여 넓은 영역 (수백만 행)을 스캔하려는 경우에 유용합니다. 이렇게하면 Scanner TimeoutException을 방지 할 수 있습니다.

    이것은 몇 개의 레코드 만 찾은 조건 스캔을 사용하여 넓은 영역 (수백만 행)을 스캔하려는 경우에 유용합니다. 이렇게하면 Scanner TimeoutException을 방지 할 수 있습니다.

    package org.apache.hadoop.hbase.mapreduce;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.List;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.JobContext;
    
    public class RegionSplitTableInputFormat extends TableInputFormat {
    
        public static final String REGION_SPLIT = "region.split";
    
        @Override
        public List<InputSplit> getSplits(JobContext context) throws IOException {
    
            Configuration conf = context.getConfiguration();
            int regionSplitCount = conf.getInt(REGION_SPLIT, 0);
            List<InputSplit> superSplits = super.getSplits(context);
            if (regionSplitCount <= 0) {
                return superSplits;
            }
    
            List<InputSplit> splits = new ArrayList<InputSplit>(superSplits.size() * regionSplitCount);
    
            for (InputSplit inputSplit : superSplits) {
                TableSplit tableSplit = (TableSplit) inputSplit;
                System.out.println("splitting by " + regionSplitCount + " " + tableSplit);
                byte[] startRow0 = tableSplit.getStartRow();
                byte[] endRow0 = tableSplit.getEndRow();
                boolean discardLastSplit = false;
                if (endRow0.length == 0) {
                    endRow0 = new byte[startRow0.length];
                    Arrays.fill(endRow0, (byte) 255);
                    discardLastSplit = true;
                }
                byte[][] split = Bytes.split(startRow0, endRow0, regionSplitCount);
                if (discardLastSplit) {
                    split[split.length - 1] = new byte[0];
                }
                for (int regionSplit = 0; regionSplit < split.length - 1; regionSplit++) {
                    byte[] startRow = split[regionSplit];
                    byte[] endRow = split[regionSplit + 1];
                    TableSplit newSplit = new TableSplit(tableSplit.getTableName(), startRow, endRow,
                            tableSplit.getLocations()[0]);
                    splits.add(newSplit);
                }
            }
    
            return splits;
        }
    }
    
  6. from https://stackoverflow.com/questions/11039562/when-using-hbase-as-a-source-for-mapreduce-can-i-extend-tableinputformatbase-to by cc-by-sa and MIT license