[HADOOP] HBase에 데이터를 프로그램 방식으로 대량로드하는 가장 빠른 방법은 무엇입니까?
HADOOPHBase에 데이터를 프로그램 방식으로 대량로드하는 가장 빠른 방법은 무엇입니까?
필자는 사용자 정의 파싱이 필요한 수백만 줄의 일반 텍스트 파일을 가지고 있으며 가능한 한 빨리 HBase 테이블에로드하려고합니다 (Hadoop 또는 HBase Java 클라이언트 사용).
현재의 솔루션은 Reduce 부분이없는 MapReduce 작업을 기반으로합니다. FileInputFormat을 사용하여 텍스트 파일을 읽으면 각 행이 Mapper 클래스의 map 메서드에 전달됩니다. 이 시점에서 라인은 문맥에 쓰여지는 Put 객체를 형성하기 위해 파싱됩니다. 그런 다음 TableOutputFormat은 Put 개체를 가져 와서 테이블에 삽입합니다.
이 솔루션을 사용하면 평균 삽입 속도가 초당 1,000 행으로 예상 한 것보다 적습니다. HBase 설정은 단일 서버에서 가상 분산 모드로 설정됩니다.
한 가지 재미있는 점은 1,000,000 개의 행을 삽입하는 동안 25 개의 매퍼 (작업)가 생성되지만 순차적으로 실행된다는 것입니다. 이게 정상인가?
현재 솔루션에 대한 코드는 다음과 같습니다.
public static class CustomMap extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
protected void map(LongWritable key, Text value, Context context) throws IOException {
Map<String, String> parsedLine = parseLine(value.toString());
Put row = new Put(Bytes.toBytes(parsedLine.get(keys[1])));
for (String currentKey : parsedLine.keySet()) {
row.add(Bytes.toBytes(currentKey),Bytes.toBytes(currentKey),Bytes.toBytes(parsedLine.get(currentKey)));
}
try {
context.write(new ImmutableBytesWritable(Bytes.toBytes(parsedLine.get(keys[1]))), row);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public int run(String[] args) throws Exception {
if (args.length != 2) {
return -1;
}
conf.set("hbase.mapred.outputtable", args[1]);
// I got these conf parameters from a presentation about Bulk Load
conf.set("hbase.hstore.blockingStoreFiles", "25");
conf.set("hbase.hregion.memstore.block.multiplier", "8");
conf.set("hbase.regionserver.handler.count", "30");
conf.set("hbase.regions.percheckin", "30");
conf.set("hbase.regionserver.globalMemcache.upperLimit", "0.3");
conf.set("hbase.regionserver.globalMemcache.lowerLimit", "0.15");
Job job = new Job(conf);
job.setJarByClass(BulkLoadMapReduce.class);
job.setJobName(NAME);
TextInputFormat.setInputPaths(job, new Path(args[0]));
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(CustomMap.class);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Put.class);
job.setNumReduceTasks(0);
job.setOutputFormatClass(TableOutputFormat.class);
job.waitForCompletion(true);
return 0;
}
public static void main(String[] args) throws Exception {
Long startTime = Calendar.getInstance().getTimeInMillis();
System.out.println("Start time : " + startTime);
int errCode = ToolRunner.run(HBaseConfiguration.create(), new BulkLoadMapReduce(), args);
Long endTime = Calendar.getInstance().getTimeInMillis();
System.out.println("End time : " + endTime);
System.out.println("Duration milliseconds: " + (endTime-startTime));
System.exit(errCode);
}
해결법
-
==============================
1.MR에서 HBase로 데이터를로드하는 효율적인 방법을 찾으려는 시도와 비슷한 프로세스를 거쳤습니다. 내가 찾은 것은 HFileOutputFormat을 MR의 OutputFormatClass로 사용하는 것입니다.
MR에서 HBase로 데이터를로드하는 효율적인 방법을 찾으려는 시도와 비슷한 프로세스를 거쳤습니다. 내가 찾은 것은 HFileOutputFormat을 MR의 OutputFormatClass로 사용하는 것입니다.
아래는 작업을 생성해야하는 코드의 기초이며 데이터를 기록하는 Mapper 맵 기능입니다. 이것은 빠르다. 우리는 더 이상 그것을 사용하지 않기 때문에 나는 숫자를 가지고 있지 않지만, 분당 250 만 레코드 정도였습니다.
다음은 MapReduce 프로세스에서 데이터를 HBase에 넣을 작업을 생성하기 위해 작성한 (박탈 한) 함수입니다.
private Job createCubeJob(...) { //Build and Configure Job Job job = new Job(conf); job.setJobName(jobName); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Put.class); job.setMapperClass(HiveToHBaseMapper.class);//Custom Mapper job.setJarByClass(CubeBuilderDriver.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(HFileOutputFormat.class); TextInputFormat.setInputPaths(job, hiveOutputDir); HFileOutputFormat.setOutputPath(job, cubeOutputPath); Configuration hConf = HBaseConfiguration.create(conf); hConf.set("hbase.zookeeper.quorum", hbaseZookeeperQuorum); hConf.set("hbase.zookeeper.property.clientPort", hbaseZookeeperClientPort); HTable hTable = new HTable(hConf, tableName); HFileOutputFormat.configureIncrementalLoad(job, hTable); return job; }
이것은 HiveToHBaseMapper 클래스의지도 함수입니다 (약간 편집 됨).
public void map(WritableComparable key, Writable val, Context context) throws IOException, InterruptedException { try{ Configuration config = context.getConfiguration(); String[] strs = val.toString().split(Constants.HIVE_RECORD_COLUMN_SEPARATOR); String family = config.get(Constants.CUBEBUILDER_CONFIGURATION_FAMILY); String column = strs[COLUMN_INDEX]; String Value = strs[VALUE_INDEX]; String sKey = generateKey(strs, config); byte[] bKey = Bytes.toBytes(sKey); Put put = new Put(bKey); put.add(Bytes.toBytes(family), Bytes.toBytes(column), (value <= 0) ? Bytes.toBytes(Double.MIN_VALUE) : Bytes.toBytes(value)); ImmutableBytesWritable ibKey = new ImmutableBytesWritable(bKey); context.write(ibKey, put); context.getCounter(CubeBuilderContextCounters.CompletedMapExecutions).increment(1); } catch(Exception e){ context.getCounter(CubeBuilderContextCounters.FailedMapExecutions).increment(1); } }
나는 이것이 당신을위한 Copy & Paste 솔루션이 될 수 없다고 확신한다. 분명히 여기에서 작업했던 데이터는 맞춤 처리가 필요하지 않았습니다. (이것은 MR 작업 이전에 수행되었습니다). 이 중 가장 중요한 것은 HFileOutputFormat입니다. 나머지는 제가 어떻게 사용했는지 보여주는 예입니다. :) 좋은 해결책을 얻으려는 탄탄한 길로 인도하기를 바랍니다. :
-
==============================
2.mapreduce.tasktracker.map.tasks.maximum 매개 변수 기본값은 2로 설정되어 노드에서 병렬로 실행할 수있는 최대 작업 수를 결정합니다. 변경하지 않는 한 각 노드에서 동시에 실행되는 2 개의 맵 작업이 표시되어야합니다.
mapreduce.tasktracker.map.tasks.maximum 매개 변수 기본값은 2로 설정되어 노드에서 병렬로 실행할 수있는 최대 작업 수를 결정합니다. 변경하지 않는 한 각 노드에서 동시에 실행되는 2 개의 맵 작업이 표시되어야합니다.
from https://stackoverflow.com/questions/8750764/what-is-the-fastest-way-to-bulk-load-data-into-hbase-programmatically by cc-by-sa and MIT license
'HADOOP' 카테고리의 다른 글
[HADOOP] 스파크 RDD - 항상 RAM에있는 파티션입니까? (0) | 2019.06.02 |
---|---|
[HADOOP] HDFS의 대형 블록 크기! 사용되지 않은 공간은 어떻게 계산됩니까? (0) | 2019.06.02 |
[HADOOP] hadoop 2.2.0 64 비트 설치가 시작되지만 시작할 수 없음 (0) | 2019.06.02 |
[HADOOP] 하둡지도 / 줄이기 대 내장지도 / 줄이기 (0) | 2019.06.02 |
[HADOOP] 데이터 노드가 시작되지 않았습니다. (0) | 2019.06.02 |