복붙노트

[HADOOP] Hadoop의 TableInputFormat을 확장하여 타임 스탬프 키의 배포에 사용되는 접두어로 스캔합니다.

HADOOP

Hadoop의 TableInputFormat을 확장하여 타임 스탬프 키의 배포에 사용되는 접두어로 스캔합니다.

hbase 테이블의 키가 hotspot가 아니기 때문에 키를 배포하기 위해 1 바이트 무작위 접두사가있는 타임 스탬프입니다. 나는 테이블에 하나의 MapReduce를 범위와 함께 실행할 수 있도록 TableInputFormat을 확장하려고합니다. 가능한 모든 접두사 앞에 접두어가 붙어 있으므로 지정된 타임 스탬프 범위의 모든 범위가 스캔됩니다. 내 솔루션은 항상 마지막 접두사 (127)를 256 번 스캔하는 것처럼 보이지만 작동하지 않습니다. 모든 스캔에서 어떤 것이 공유되어야합니다.

내 코드는 아래와 같습니다. 어떤 아이디어?

public class PrefixedTableInputFormat extends TableInputFormat {

  @Override
  public List<InputSplit> getSplits(JobContext context)
    throws IOException {
    List<InputSplit> splits = new ArrayList<InputSplit>();
    Scan scan = getScan();
    byte startRow[] = scan.getStartRow(), stopRow[] = scan.getStopRow();
    byte prefixedStartRow[] = new byte[startRow.length+1];
    byte prefixedStopRow[] = new byte[stopRow.length+1];
    System.arraycopy(startRow, 0, prefixedStartRow, 1, startRow.length);
    System.arraycopy(stopRow, 0, prefixedStopRow, 1, stopRow.length);

    for (int prefix = -128; prefix < 128; prefix++) {
      prefixedStartRow[0] = (byte) prefix;
      prefixedStopRow[0] = (byte) prefix;
      scan.setStartRow(prefixedStartRow);
      scan.setStopRow(prefixedStopRow);
      setScan(scan);
      splits.addAll(super.getSplits(context));
    }

    return splits;
  }
}

  Configuration config = HBaseConfiguration.create();
  Job job = new Job(config, "Aggregate");
  job.setJarByClass(Aggregate.class);

  Scan scan = new Scan();
  scan.setStartRow("20120630".getBytes());
  scan.setStopRow("20120701".getBytes());
  scan.setCaching(500);
  scan.setCacheBlocks(false);

  TableMapReduceUtil.initTableMapperJob(
      "event",
      scan,
      Mapper.class,
      ImmutableBytesWritable.class,
      ImmutableBytesWritable.class,
      job,
      true,
      PrefixedTableInputFormat.class);
  TableMapReduceUtil.initTableReducerJob("event", Reducer.class, job);

해결법

  1. ==============================

    1.각 반복에서 스플릿의 전체 복사본을 만들어야합니다.

    각 반복에서 스플릿의 전체 복사본을 만들어야합니다.

    for (int prefix = -128; prefix < 128; prefix++) {
      prefixedStartRow[0] = (byte) prefix;
      prefixedStopRow[0] = (byte) prefix;
      scan.setStartRow(prefixedStartRow);
      scan.setStopRow(prefixedStopRow);
      setScan(scan);
    
      for (InputSplit subSplit : super.getSplits(context)) {
        splits.add((InputSplit) ReflectionUtils.copy(conf,
              (TableSplit) subSplit, new TableSplit());
      }
    }
    
  2. from https://stackoverflow.com/questions/11353911/extending-hadoops-tableinputformat-to-scan-with-a-prefix-used-for-distribution by cc-by-sa and MIT license