[HADOOP] Hadoop의 TableInputFormat을 확장하여 타임 스탬프 키의 배포에 사용되는 접두어로 스캔합니다.
HADOOPHadoop의 TableInputFormat을 확장하여 타임 스탬프 키의 배포에 사용되는 접두어로 스캔합니다.
hbase 테이블의 키가 hotspot가 아니기 때문에 키를 배포하기 위해 1 바이트 무작위 접두사가있는 타임 스탬프입니다. 나는 테이블에 하나의 MapReduce를 범위와 함께 실행할 수 있도록 TableInputFormat을 확장하려고합니다. 가능한 모든 접두사 앞에 접두어가 붙어 있으므로 지정된 타임 스탬프 범위의 모든 범위가 스캔됩니다. 내 솔루션은 항상 마지막 접두사 (127)를 256 번 스캔하는 것처럼 보이지만 작동하지 않습니다. 모든 스캔에서 어떤 것이 공유되어야합니다.
내 코드는 아래와 같습니다. 어떤 아이디어?
public class PrefixedTableInputFormat extends TableInputFormat {
@Override
public List<InputSplit> getSplits(JobContext context)
throws IOException {
List<InputSplit> splits = new ArrayList<InputSplit>();
Scan scan = getScan();
byte startRow[] = scan.getStartRow(), stopRow[] = scan.getStopRow();
byte prefixedStartRow[] = new byte[startRow.length+1];
byte prefixedStopRow[] = new byte[stopRow.length+1];
System.arraycopy(startRow, 0, prefixedStartRow, 1, startRow.length);
System.arraycopy(stopRow, 0, prefixedStopRow, 1, stopRow.length);
for (int prefix = -128; prefix < 128; prefix++) {
prefixedStartRow[0] = (byte) prefix;
prefixedStopRow[0] = (byte) prefix;
scan.setStartRow(prefixedStartRow);
scan.setStopRow(prefixedStopRow);
setScan(scan);
splits.addAll(super.getSplits(context));
}
return splits;
}
}
과
Configuration config = HBaseConfiguration.create();
Job job = new Job(config, "Aggregate");
job.setJarByClass(Aggregate.class);
Scan scan = new Scan();
scan.setStartRow("20120630".getBytes());
scan.setStopRow("20120701".getBytes());
scan.setCaching(500);
scan.setCacheBlocks(false);
TableMapReduceUtil.initTableMapperJob(
"event",
scan,
Mapper.class,
ImmutableBytesWritable.class,
ImmutableBytesWritable.class,
job,
true,
PrefixedTableInputFormat.class);
TableMapReduceUtil.initTableReducerJob("event", Reducer.class, job);
해결법
-
==============================
1.각 반복에서 스플릿의 전체 복사본을 만들어야합니다.
각 반복에서 스플릿의 전체 복사본을 만들어야합니다.
for (int prefix = -128; prefix < 128; prefix++) { prefixedStartRow[0] = (byte) prefix; prefixedStopRow[0] = (byte) prefix; scan.setStartRow(prefixedStartRow); scan.setStopRow(prefixedStopRow); setScan(scan); for (InputSplit subSplit : super.getSplits(context)) { splits.add((InputSplit) ReflectionUtils.copy(conf, (TableSplit) subSplit, new TableSplit()); } }
from https://stackoverflow.com/questions/11353911/extending-hadoops-tableinputformat-to-scan-with-a-prefix-used-for-distribution by cc-by-sa and MIT license
'HADOOP' 카테고리의 다른 글
[HADOOP] 여러 매퍼가있는 다중 입력 경로에 대해 oozie 워크 플로를 구성하는 방법 (0) | 2019.06.11 |
---|---|
[HADOOP] Kerberos에 액세스하는 것은 SPnego없이 WebHDFS를 보호했습니다. (0) | 2019.06.11 |
[HADOOP] JAVA API를 사용하여 HDFS에서 파일을 이동하거나 복사하는 방법 (0) | 2019.06.11 |
[HADOOP] 오류 JA017로 인해 Oozie Workflow가 실패했습니다. (0) | 2019.06.11 |
[HADOOP] Hadoop MapReduce 작업을 실행할 때 Map의 키 / 값 입력으로 Filename / FileData 가져 오기 (0) | 2019.06.11 |