[HADOOP] HBase를 MapReduce의 소스로 사용할 때 TableInputFormatBase를 확장하여 각 영역에 대해 여러 개의 스플릿과 여러 맵퍼를 만들 수 있습니까?
HADOOPHBase를 MapReduce의 소스로 사용할 때 TableInputFormatBase를 확장하여 각 영역에 대해 여러 개의 스플릿과 여러 맵퍼를 만들 수 있습니까?
HBase를 MapReduce 작업 중 하나의 소스로 사용하려고합니다. 나는 TableInputFormat이 하나의 입력 분할 (따라서 하나의 매퍼)을 각 영역마다 지정한다는 것을 알고있다. 그러나 이것은 비효율적 인 것으로 보인다. 한 번에 주어진 지역에서 여러 매퍼를 작업하고 싶습니다. TableInputFormatBase를 확장하여이 작업을 수행 할 수 있습니까? 한 가지 예를 가르쳐 주시겠습니까? 게다가, 이것도 좋은 생각입니까?
도와 주셔서 감사합니다.
해결법
-
==============================
1.InputFormat를 확장하는 사용자 정의 입력 형식이 필요합니다. 당신은 어떻게 많은 데이터 (범위 기반 쿼리)를 스캔 할 것인가, 데이터를 쓰는 동안 스캔이 더 빨라질 수있는 모든 최적화가 무엇인지에 대한 질문에서 대답으로 어떻게 할 것인지 생각할 수 있습니다. 데이터 처리 시간이 데이터 검색 시간보다 더 큰 경우 이는 좋은 아이디어입니다.
InputFormat를 확장하는 사용자 정의 입력 형식이 필요합니다. 당신은 어떻게 많은 데이터 (범위 기반 쿼리)를 스캔 할 것인가, 데이터를 쓰는 동안 스캔이 더 빨라질 수있는 모든 최적화가 무엇인지에 대한 질문에서 대답으로 어떻게 할 것인지 생각할 수 있습니다. 데이터 처리 시간이 데이터 검색 시간보다 더 큰 경우 이는 좋은 아이디어입니다.
-
==============================
2.주어진 영역에 대해 여러 맵퍼를 지정할 수 있는지는 확실하지 않지만 다음을 고려하십시오.
주어진 영역에 대해 여러 맵퍼를 지정할 수 있는지는 확실하지 않지만 다음을 고려하십시오.
한 매퍼가 지역마다 비효율적이라고 생각하면 (아마도 데이터 노드에 #cpus와 같은 리소스가 충분하지 않을 수도 있습니다) 파일 hbase-site.xml에 더 작은 영역 크기를 지정할 수 있습니다.
다음을 변경하려는 경우 기본 configs 옵션 사이트가 있습니다. http://hbase.apache.org/configuration.html#hbase_default_configurations
지역 크기를 작게하면 DFS의 파일 수를 늘릴 수 있으며 namenode의 메모리에 따라 hadoop DFS의 용량을 제한 할 수 있습니다. namenode의 메모리 사용은 DFS의 파일 수와 직접적으로 관련되어 있음을 기억하십시오. 클러스터가 어떻게 사용되고 있는지 모르기 때문에 상황에 따라 다를 수도 있고 그렇지 않을 수도 있습니다. 이 질문에 대한 은색 탄환의 대답은 결코 없습니다!
-
==============================
3.1 . 키 집합이 매퍼간에 상호 배타적인지 확인하는 것은 절대적으로 좋습니다.
1 . 키 집합이 매퍼간에 상호 배타적인지 확인하는 것은 절대적으로 좋습니다.
-
==============================
4.이 MultipleScanTableInputFormat을 사용하면 MultipleScanTableInputFormat.PARTITIONS_PER_REGION_SERVER 구성을 사용하여 단일 regionserver에 대해 실행해야하는 매퍼 수를 제어 할 수 있습니다. 클래스는 모든 입력 스플릿을 해당 위치 (regionserver)별로 그룹화하며 RecordReader는 매퍼에 대한 모든 집계 된 스플릿을 적절히 반복합니다.
이 MultipleScanTableInputFormat을 사용하면 MultipleScanTableInputFormat.PARTITIONS_PER_REGION_SERVER 구성을 사용하여 단일 regionserver에 대해 실행해야하는 매퍼 수를 제어 할 수 있습니다. 클래스는 모든 입력 스플릿을 해당 위치 (regionserver)별로 그룹화하며 RecordReader는 매퍼에 대한 모든 집계 된 스플릿을 적절히 반복합니다.
여기에 예제가있다.
https://gist.github.com/bbeaudreault/9788499#file-multiplescantableinputformat-java-L90
그 작업은 단일 매퍼에 대해 여러 개의 집계 된 분할을 만들었습니다.
private List<InputSplit> getAggregatedSplits(JobContext context) throws IOException { final List<InputSplit> aggregatedSplits = new ArrayList<InputSplit>(); final Scan scan = getScan(); for (int i = 0; i < startRows.size(); i++) { scan.setStartRow(startRows.get(i)); scan.setStopRow(stopRows.get(i)); setScan(scan); aggregatedSplits.addAll(super.getSplits(context)); } // set the state back to where it was.. scan.setStopRow(null); scan.setStartRow(null); setScan(scan); return aggregatedSplits; }
지역 서버로 파티션 만들기
@Override public List<InputSplit> getSplits(JobContext context) throws IOException { List<InputSplit> source = getAggregatedSplits(context); if (!partitionByRegionServer) { return source; } // Partition by regionserver Multimap<String, TableSplit> partitioned = ArrayListMultimap.<String, TableSplit>create(); for (InputSplit split : source) { TableSplit cast = (TableSplit) split; String rs = cast.getRegionLocation(); partitioned.put(rs, cast); }
-
==============================
5.이것은 몇 개의 레코드 만 찾은 조건 스캔을 사용하여 넓은 영역 (수백만 행)을 스캔하려는 경우에 유용합니다. 이렇게하면 Scanner TimeoutException을 방지 할 수 있습니다.
이것은 몇 개의 레코드 만 찾은 조건 스캔을 사용하여 넓은 영역 (수백만 행)을 스캔하려는 경우에 유용합니다. 이렇게하면 Scanner TimeoutException을 방지 할 수 있습니다.
package org.apache.hadoop.hbase.mapreduce; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; public class RegionSplitTableInputFormat extends TableInputFormat { public static final String REGION_SPLIT = "region.split"; @Override public List<InputSplit> getSplits(JobContext context) throws IOException { Configuration conf = context.getConfiguration(); int regionSplitCount = conf.getInt(REGION_SPLIT, 0); List<InputSplit> superSplits = super.getSplits(context); if (regionSplitCount <= 0) { return superSplits; } List<InputSplit> splits = new ArrayList<InputSplit>(superSplits.size() * regionSplitCount); for (InputSplit inputSplit : superSplits) { TableSplit tableSplit = (TableSplit) inputSplit; System.out.println("splitting by " + regionSplitCount + " " + tableSplit); byte[] startRow0 = tableSplit.getStartRow(); byte[] endRow0 = tableSplit.getEndRow(); boolean discardLastSplit = false; if (endRow0.length == 0) { endRow0 = new byte[startRow0.length]; Arrays.fill(endRow0, (byte) 255); discardLastSplit = true; } byte[][] split = Bytes.split(startRow0, endRow0, regionSplitCount); if (discardLastSplit) { split[split.length - 1] = new byte[0]; } for (int regionSplit = 0; regionSplit < split.length - 1; regionSplit++) { byte[] startRow = split[regionSplit]; byte[] endRow = split[regionSplit + 1]; TableSplit newSplit = new TableSplit(tableSplit.getTableName(), startRow, endRow, tableSplit.getLocations()[0]); splits.add(newSplit); } } return splits; } }
from https://stackoverflow.com/questions/11039562/when-using-hbase-as-a-source-for-mapreduce-can-i-extend-tableinputformatbase-to by cc-by-sa and MIT license
'HADOOP' 카테고리의 다른 글
[HADOOP] Hadoop MapReduce log4j - userlogs / job_ dir의 사용자 정의 파일에 메시지를 기록 하시겠습니까? (0) | 2019.08.04 |
---|---|
[HADOOP] classpath를 설정 한 후에 org.apache.hadoop.conf 패키지가 존재하지 않습니다. (0) | 2019.08.04 |
[HADOOP] hadoop 2.4.0에서 MapReduce 작업을 실행할 수 없습니다. (0) | 2019.08.04 |
[HADOOP] mapreduce 카운트 예제 (0) | 2019.08.04 |
[HADOOP] 주인에 의해 분리되고 제거 된 스파크 드라이버 (0) | 2019.08.04 |