[HADOOP] Hadoop - Mapper에서 직접 HBase에 쓰기
HADOOPHadoop - Mapper에서 직접 HBase에 쓰기
HBase에 결과물을 작성해야하는 일이 있습니다. 나는 정말로 감속기가 필요하지 않습니다. 삽입하려는 행은 매퍼에서 결정됩니다.
어떻게 이것을 달성하기 위해 TableOutputFormat을 사용할 수 있습니까? 모든 예에서 필자는 가정은 감속기가 Put을 만드는 것이고 TableMapper는 HBase 테이블을 읽는 것임을 알았습니다.
내 경우에는 입력 HDFS 출력은 특정 테이블에 넣어, 그 중 하나를 도울 수있는 TableMapReduceUtil에서 아무것도 찾을 수 없습니다.
저기서 저를 도울 수있는 예가 있습니까?
BTW, 새로운 Hadoop API를 사용하고 있습니다.
해결법
-
==============================
1.이것은 파일에서 읽고 모든 행을 Hbase에 넣는 예제입니다. 이 예제는 "Hbase : The Definitive Guide"의 예제이며 저장소에서 찾을 수 있습니다. 컴퓨터에서 복제본을 보려면 다음과 같이하십시오.
이것은 파일에서 읽고 모든 행을 Hbase에 넣는 예제입니다. 이 예제는 "Hbase : The Definitive Guide"의 예제이며 저장소에서 찾을 수 있습니다. 컴퓨터에서 복제본을 보려면 다음과 같이하십시오.
git clone git://github.com/larsgeorge/hbase-book.git
이 책에서 코드에 대한 모든 설명을 찾을 수도 있습니다. 그러나 무언가가 이해할 수 없다면 언제든지 물어보십시오.
` public class ImportFromFile { public static final String NAME = "ImportFromFile"; public enum Counters { LINES } static class ImportMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Writable> { private byte[] family = null; private byte[] qualifier = null; @Override protected void setup(Context context) throws IOException, InterruptedException { String column = context.getConfiguration().get("conf.column"); byte[][] colkey = KeyValue.parseColumn(Bytes.toBytes(column)); family = colkey[0]; if (colkey.length > 1) { qualifier = colkey[1]; } } @Override public void map(LongWritable offset, Text line, Context context) throws IOException { try { String lineString = line.toString(); byte[] rowkey = DigestUtils.md5(lineString); Put put = new Put(rowkey); put.add(family, qualifier, Bytes.toBytes(lineString)); context.write(new ImmutableBytesWritable(rowkey), put); context.getCounter(Counters.LINES).increment(1); } catch (Exception e) { e.printStackTrace(); } } } private static CommandLine parseArgs(String[] args) throws ParseException { Options options = new Options(); Option o = new Option("t", "table", true, "table to import into (must exist)"); o.setArgName("table-name"); o.setRequired(true); options.addOption(o); o = new Option("c", "column", true, "column to store row data into (must exist)"); o.setArgName("family:qualifier"); o.setRequired(true); options.addOption(o); o = new Option("i", "input", true, "the directory or file to read from"); o.setArgName("path-in-HDFS"); o.setRequired(true); options.addOption(o); options.addOption("d", "debug", false, "switch on DEBUG log level"); CommandLineParser parser = new PosixParser(); CommandLine cmd = null; try { cmd = parser.parse(options, args); } catch (Exception e) { System.err.println("ERROR: " + e.getMessage() + "\n"); HelpFormatter formatter = new HelpFormatter(); formatter.printHelp(NAME + " ", options, true); System.exit(-1); } return cmd; } public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); CommandLine cmd = parseArgs(otherArgs); String table = cmd.getOptionValue("t"); String input = cmd.getOptionValue("i"); String column = cmd.getOptionValue("c"); conf.set("conf.column", column); Job job = new Job(conf, "Import from file " + input + " into table " + table); job.setJarByClass(ImportFromFile.class); job.setMapperClass(ImportMapper.class); job.setOutputFormatClass(TableOutputFormat.class); job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(Writable.class); job.setNumReduceTasks(0); FileInputFormat.addInputPath(job, new Path(input)); System.exit(job.waitForCompletion(true) ? 0 : 1); } }`
-
==============================
2.매퍼 출력을 쌍으로 출력하면됩니다. OutputFormat은 출력 키 - 값을 유지하는 방법 만 지정합니다. 반드시 핵심 가치가 감속기에서 나온다는 의미는 아닙니다. 매퍼에서 다음과 같은 작업을 수행해야합니다.
매퍼 출력을 쌍으로 출력하면됩니다. OutputFormat은 출력 키 - 값을 유지하는 방법 만 지정합니다. 반드시 핵심 가치가 감속기에서 나온다는 의미는 아닙니다. 매퍼에서 다음과 같은 작업을 수행해야합니다.
... extends TableMapper<ImmutableBytesWritable, Put>() { ... ... context.write(<some key>, <some Put or Delete object>); }
from https://stackoverflow.com/questions/11061854/hadoop-writing-to-hbase-directly-from-the-mapper by cc-by-sa and MIT license
'HADOOP' 카테고리의 다른 글
[HADOOP] Hadoop 2의 Hive 매퍼 수를 늘립니다. (0) | 2019.07.07 |
---|---|
[HADOOP] 싱크가 실패한 후에 Flume-NG가 강제로 이벤트 백 로그를 처리하도록하려면 어떻게해야합니까? (0) | 2019.07.07 |
[HADOOP] Hbase Java 예제를 실행하는 방법? (0) | 2019.07.07 |
[HADOOP] hadoop의 전역 변수 (0) | 2019.07.07 |
[HADOOP] Hadoop MapReduce : 감속기 수에 대한 설명 (0) | 2019.07.07 |