[HADOOP] HBase에 데이터를 프로그램 방식으로 대량로드하는 가장 빠른 방법은 무엇입니까?
필자는 사용자 정의 파싱이 필요한 수백만 줄의 일반 텍스트 파일을 가지고 있으며 가능한 한 빨리 HBase 테이블에로드하려고합니다 (Hadoop 또는 HBase Java 클라이언트 사용).
현재의 솔루션은 Reduce 부분이없는 MapReduce 작업을 기반으로합니다. FileInputFormat을 사용하여 텍스트 파일을 읽으면 각 행이 Mapper 클래스의 map 메서드에 전달됩니다. 이 시점에서 라인은 문맥에 쓰여지는 Put 객체를 형성하기 위해 파싱됩니다. 그런 다음 TableOutputFormat은 Put 개체를 가져 와서 테이블에 삽입합니다.
이 솔루션을 사용하면 평균 삽입 속도가 초당 1,000 행으로 예상 한 것보다 적습니다. HBase 설정은 단일 서버에서 가상 분산 모드로 설정됩니다.
한 가지 재미있는 점은 1,000,000 개의 행을 삽입하는 동안 25 개의 매퍼 (작업)가 생성되지만 순차적으로 실행된다는 것입니다. 이게 정상인가?
현재 솔루션에 대한 코드는 다음과 같습니다.
public static class CustomMap extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
protected void map(LongWritable key, Text value, Context context) throws IOException {
Map<String, String> parsedLine = parseLine(value.toString());
Put row = new Put(Bytes.toBytes(parsedLine.get(keys[1])));
for (String currentKey : parsedLine.keySet()) {
try {
context.write(new ImmutableBytesWritable(Bytes.toBytes(parsedLine.get(keys[1]))), row);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
public int run(String[] args) throws Exception {
if (args.length != 2) {
return -1;
conf.set("hbase.mapred.outputtable", args[1]);
// I got these conf parameters from a presentation about Bulk Load
conf.set("hbase.hstore.blockingStoreFiles", "25");
conf.set("hbase.hregion.memstore.block.multiplier", "8");
conf.set("hbase.regionserver.handler.count", "30");
conf.set("hbase.regions.percheckin", "30");
conf.set("hbase.regionserver.globalMemcache.upperLimit", "0.3");
conf.set("hbase.regionserver.globalMemcache.lowerLimit", "0.15");
Job job = new Job(conf);
TextInputFormat.setInputPaths(job, new Path(args[0]));
return 0;
public static void main(String[] args) throws Exception {
Long startTime = Calendar.getInstance().getTimeInMillis();
System.out.println("Start time : " + startTime);
int errCode = ToolRunner.run(HBaseConfiguration.create(), new BulkLoadMapReduce(), args);
Long endTime = Calendar.getInstance().getTimeInMillis();
System.out.println("End time : " + endTime);
System.out.println("Duration milliseconds: " + (endTime-startTime));
MR에서 HBase로 데이터를로드하는 효율적인 방법을 찾으려는 시도와 비슷한 프로세스를 거쳤습니다. 내가 찾은 것은 HFileOutputFormat을 MR의 OutputFormatClass로 사용하는 것입니다.
아래는 작업을 생성해야하는 코드의 기초이며 데이터를 기록하는 Mapper 맵 기능입니다. 이것은 빠르다. 우리는 더 이상 그것을 사용하지 않기 때문에 나는 숫자를 가지고 있지 않지만, 분당 250 만 레코드 정도였습니다.
다음은 MapReduce 프로세스에서 데이터를 HBase에 넣을 작업을 생성하기 위해 작성한 (박탈 한) 함수입니다.
private Job createCubeJob(...) { //Build and Configure Job Job job = new Job(conf); job.setJobName(jobName); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Put.class); job.setMapperClass(HiveToHBaseMapper.class);//Custom Mapper job.setJarByClass(CubeBuilderDriver.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(HFileOutputFormat.class); TextInputFormat.setInputPaths(job, hiveOutputDir); HFileOutputFormat.setOutputPath(job, cubeOutputPath); Configuration hConf = HBaseConfiguration.create(conf); hConf.set("hbase.zookeeper.quorum", hbaseZookeeperQuorum); hConf.set("hbase.zookeeper.property.clientPort", hbaseZookeeperClientPort); HTable hTable = new HTable(hConf, tableName); HFileOutputFormat.configureIncrementalLoad(job, hTable); return job; }
이것은 HiveToHBaseMapper 클래스의지도 함수입니다 (약간 편집 됨).
public void map(WritableComparable key, Writable val, Context context) throws IOException, InterruptedException { try{ Configuration config = context.getConfiguration(); String[] strs = val.toString().split(Constants.HIVE_RECORD_COLUMN_SEPARATOR); String family = config.get(Constants.CUBEBUILDER_CONFIGURATION_FAMILY); String column = strs[COLUMN_INDEX]; String Value = strs[VALUE_INDEX]; String sKey = generateKey(strs, config); byte[] bKey = Bytes.toBytes(sKey); Put put = new Put(bKey); put.add(Bytes.toBytes(family), Bytes.toBytes(column), (value <= 0) ? Bytes.toBytes(Double.MIN_VALUE) : Bytes.toBytes(value)); ImmutableBytesWritable ibKey = new ImmutableBytesWritable(bKey); context.write(ibKey, put); context.getCounter(CubeBuilderContextCounters.CompletedMapExecutions).increment(1); } catch(Exception e){ context.getCounter(CubeBuilderContextCounters.FailedMapExecutions).increment(1); } }
나는 이것이 당신을위한 Copy & Paste 솔루션이 될 수 없다고 확신한다. 분명히 여기에서 작업했던 데이터는 맞춤 처리가 필요하지 않았습니다. (이것은 MR 작업 이전에 수행되었습니다). 이 중 가장 중요한 것은 HFileOutputFormat입니다. 나머지는 제가 어떻게 사용했는지 보여주는 예입니다. :) 좋은 해결책을 얻으려는 탄탄한 길로 인도하기를 바랍니다. :
mapreduce.tasktracker.map.tasks.maximum 매개 변수 기본값은 2로 설정되어 노드에서 병렬로 실행할 수있는 최대 작업 수를 결정합니다. 변경하지 않는 한 각 노드에서 동시에 실행되는 2 개의 맵 작업이 표시되어야합니다.
