[HADOOP] 1 시간이 걸리는 hbase에 1GB 데이터로드
HADOOP1 시간이 걸리는 hbase에 1GB 데이터로드
Hbase에 1GB (10 백만 레코드) CSV 파일을로드하고 싶습니다. 이를 위해 Map-Reduce 프로그램을 작성했습니다. 내 코드가 제대로 작동하지만 완료하는 데 1 시간이 걸립니다. 마지막 감속기는 30 분 이상 시간이 걸립니다. 누구든지 나를 도울 수 있습니까?
내 코드는 다음과 같습니다.
Driver.Java
package com.cloudera.examples.hbase.bulkimport; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * HBase bulk import example
* Data preparation MapReduce job driver **
*/ public class Driver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); /* * NBA Final 2010 game 1 tip-off time (seconds from epoch) * Thu, 03 Jun 2010 18:00:00 PDT */ // conf.setInt("epoch.seconds.tipoff", 1275613200); conf.set("hbase.table.name", args[2]); // Load hbase-site.xml HBaseConfiguration.addHbaseResources(conf); Job job = new Job(conf, "HBase Bulk Import Example"); job.setJarByClass(HBaseKVMapper.class); job.setMapperClass(HBaseKVMapper.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(KeyValue.class); job.setInputFormatClass(TextInputFormat.class); HTable hTable = new HTable(conf, args[2]); // Auto configure partitioner and reducer HFileOutputFormat.configureIncrementalLoad(job, hTable); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); // Load generated HFiles into table // LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); // loader.doBulkLoad(new Path(args[1]), hTable); } }- args[0]: HDFS input path *
- args[1]: HDFS output path *
- args[2]: HBase table name *
HColumnEnum.java
package com.cloudera.examples.hbase.bulkimport; /** * HBase table columns for the 'srv' column family */ public enum HColumnEnum { SRV_COL_employeeid ("employeeid".getBytes()), SRV_COL_eventdesc ("eventdesc".getBytes()), SRV_COL_eventdate ("eventdate".getBytes()), SRV_COL_objectname ("objectname".getBytes()), SRV_COL_objectfolder ("objectfolder".getBytes()), SRV_COL_ipaddress ("ipaddress".getBytes()); private final byte[] columnName; HColumnEnum (byte[] column) { this.columnName = column; } public byte[] getColumnName() { return this.columnName; } }
HBaseKVMapper.java
package com.cloudera.examples.hbase.bulkimport;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import au.com.bytecode.opencsv.CSVParser;
/**
* HBase bulk import example
* <p>
* Parses Facebook and Twitter messages from CSV files and outputs
* <ImmutableBytesWritable, KeyValue>.
* <p>
* The ImmutableBytesWritable key is used by the TotalOrderPartitioner to map it
* into the correct HBase table region.
* <p>
* The KeyValue value holds the HBase mutation information (column family,
* column, and value)
*/
public class HBaseKVMapper extends
Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> {
final static byte[] SRV_COL_FAM = "srv".getBytes();
final static int NUM_FIELDS = 6;
CSVParser csvParser = new CSVParser();
int tipOffSeconds = 0;
String tableName = "";
// DateTimeFormatter p = DateTimeFormat.forPattern("MMM dd, yyyy HH:mm:ss")
// .withLocale(Locale.US).withZone(DateTimeZone.forID("PST8PDT"));
ImmutableBytesWritable hKey = new ImmutableBytesWritable();
KeyValue kv;
/** {@inheritDoc} */
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
Configuration c = context.getConfiguration();
// tipOffSeconds = c.getInt("epoch.seconds.tipoff", 0);
tableName = c.get("hbase.table.name");
}
/** {@inheritDoc} */
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
/*if (value.find("Service,Term,") > -1) {
// Skip header
return;
}*/
String[] fields = null;
try {
fields = value.toString().split(",");
//csvParser.parseLine(value.toString());
} catch (Exception ex) {
context.getCounter("HBaseKVMapper", "PARSE_ERRORS").increment(1);
return;
}
if (fields.length != NUM_FIELDS) {
context.getCounter("HBaseKVMapper", "INVALID_FIELD_LEN").increment(1);
return;
}
// Get game offset in seconds from tip-off
/* DateTime dt = null;
try {
dt = p.parseDateTime(fields[9]);
} catch (Exception ex) {
context.getCounter("HBaseKVMapper", "INVALID_DATE").increment(1);
return;
}
int gameOffset = (int) ((dt.getMillis() / 1000) - tipOffSeconds);
String offsetForKey = String.format("%04d", gameOffset);
String username = fields[2];
if (username.equals("")) {
username = fields[3];
}*/
// Key: e.g. "1200:twitter:jrkinley"
hKey.set(String.format("%s|%s|%s|%s|%s|%s", fields[0], fields[1], fields[2],fields[3],fields[4],fields[5])
.getBytes());
// Service columns
if (!fields[0].equals("")) {
kv = new KeyValue(hKey.get(), SRV_COL_FAM,
HColumnEnum.SRV_COL_employeeid.getColumnName(), fields[0].getBytes());
context.write(hKey, kv);
}
if (!fields[1].equals("")) {
kv = new KeyValue(hKey.get(), SRV_COL_FAM,
HColumnEnum.SRV_COL_eventdesc.getColumnName(), fields[1].getBytes());
context.write(hKey, kv);
}
if (!fields[2].equals("")) {
kv = new KeyValue(hKey.get(), SRV_COL_FAM,
HColumnEnum.SRV_COL_eventdate.getColumnName(), fields[2].getBytes());
context.write(hKey, kv);
}
if (!fields[3].equals("")) {
kv = new KeyValue(hKey.get(), SRV_COL_FAM,
HColumnEnum.SRV_COL_objectname.getColumnName(), fields[3].getBytes());
context.write(hKey, kv);
}
if (!fields[4].equals("")) {
kv = new KeyValue(hKey.get(), SRV_COL_FAM,
HColumnEnum.SRV_COL_objectfolder.getColumnName(), fields[4].getBytes());
context.write(hKey, kv);
}
if (!fields[5].equals("")) {
kv = new KeyValue(hKey.get(), SRV_COL_FAM,
HColumnEnum.SRV_COL_ipaddress.getColumnName(), fields[5].getBytes());
context.write(hKey, kv);
}
context.getCounter("HBaseKVMapper", "NUM_MSGS").increment(1);
/*
* Output number of messages per quarter and before/after game. This should
* correspond to the number of messages per region in HBase
*/
/* if (gameOffset < 0) {
context.getCounter("QStats", "BEFORE_GAME").increment(1);
} else if (gameOffset < 900) {
context.getCounter("QStats", "Q1").increment(1);
} else if (gameOffset < 1800) {
context.getCounter("QStats", "Q2").increment(1);
} else if (gameOffset < 2700) {
context.getCounter("QStats", "Q3").increment(1);
} else if (gameOffset < 3600) {
context.getCounter("QStats", "Q4").increment(1);
} else {
context.getCounter("QStats", "AFTER_GAME").increment(1);
}*/
}
}
해결법
-
==============================
1.첫째, 왜 작은 파일 (1GB)을 위해 Hbase에 데이터를로드하기 위해 Mapreduce 프로그램이 필요한가?
첫째, 왜 작은 파일 (1GB)을 위해 Hbase에 데이터를로드하기 위해 Mapreduce 프로그램이 필요한가?
내 경험으로 잭슨 스트리밍을 사용하여 5GB Json을 처리했으며 (메모리에 모든 json을 넣고 싶지는 않음) 배치 기법을 사용하여 8 분 내에 Hbase에서 지속되었습니다.
hbase를 100000 레코드의 배치 목록 객체에 넣었습니다.
아래는 이것을 달성 한 코드 스 니펫입니다. 다른 형식도 구문 분석하는 동안 동일한 작업을 수행 할 수 있음)
이 방법을 두 곳에서 호출해야 할 수도 있습니다.
1) 100000 레코드의 일괄 처리.
2) 배치 기록 처리 알림의 경우 100000 미만
public void addRecord(final ArrayList<Put> puts, final String tableName) throws Exception { try { final HTable table = new HTable(HBaseConnection.getHBaseConfiguration(), getTable(tableName)); table.put(puts); LOG.info("INSERT record[s] " + puts.size() + " to table " + tableName + " OK."); } catch (final Throwable e) { e.printStackTrace(); } finally { LOG.info("Processed ---> " + puts.size()); if (puts != null) { puts.clear(); } } }
-
==============================
2.매퍼 클래스 만 만들고 hbase 출력 형식 클래스를 사용합니다. 이제 10 분이 걸렸습니다. 네트워크 속도가 너무 느려서 시간이 오래 걸립니다.
매퍼 클래스 만 만들고 hbase 출력 형식 클래스를 사용합니다. 이제 10 분이 걸렸습니다. 네트워크 속도가 너무 느려서 시간이 오래 걸립니다.
-
==============================
3.Hbase 테이블을 작성하는 동안 사용할 Region 분할 수를 지정하여 세부적으로 조정할 수 있습니다. 벌크 로딩을위한 리듀서 인스턴스의 수는 리전 수에도 의존합니다. 이것은 다음 명령으로 수행 할 수 있습니다
Hbase 테이블을 작성하는 동안 사용할 Region 분할 수를 지정하여 세부적으로 조정할 수 있습니다. 벌크 로딩을위한 리듀서 인스턴스의 수는 리전 수에도 의존합니다. 이것은 다음 명령으로 수행 할 수 있습니다
hbase org.apache.hadoop.hbase.util.RegionSplitter -c <number of regions> -f <column families> <New Hbase Table Name> <splitAlgorithm>
분할 알고리즘에 대해 지정할 수 있습니다.
from https://stackoverflow.com/questions/23421818/loading-1gb-data-into-hbase-taking-1-hour by cc-by-sa and MIT license
'HADOOP' 카테고리의 다른 글
[HADOOP] 하이브에서 sysdate -1 가져 오기 (0) | 2019.08.06 |
---|---|
[HADOOP] HDFS에서 Hive 테이블로 데이터 가져 오기 (0) | 2019.08.06 |
[HADOOP] Apache Giraph - 한 번에 하나의 작업 만 있기 때문에 분할 마스터 / 작업자 모드에서 실행할 수 없습니다. (0) | 2019.08.06 |
[HADOOP] oozie 작업에서 다중 libpath를 어떻게 지정합니까? (0) | 2019.08.06 |
[HADOOP] textinputformat.record.delimiter를 하이브 cli / beeline 내에서 기본값으로 재설정하는 방법은 무엇입니까? (0) | 2019.08.06 |