복붙노트

[HADOOP] Hadoop - Mapper에서 직접 HBase에 쓰기

HADOOP

Hadoop - Mapper에서 직접 HBase에 쓰기

HBase에 결과물을 작성해야하는 일이 있습니다. 나는 정말로 감속기가 필요하지 않습니다. 삽입하려는 행은 매퍼에서 결정됩니다.

어떻게 이것을 달성하기 위해 TableOutputFormat을 사용할 수 있습니까? 모든 예에서 필자는 가정은 감속기가 Put을 만드는 것이고 TableMapper는 HBase 테이블을 읽는 것임을 알았습니다.

내 경우에는 입력 HDFS 출력은 특정 테이블에 넣어, 그 중 하나를 도울 수있는 TableMapReduceUtil에서 아무것도 찾을 수 없습니다.

저기서 저를 도울 수있는 예가 있습니까?

BTW, 새로운 Hadoop API를 사용하고 있습니다.

해결법

  1. ==============================

    1.이것은 파일에서 읽고 모든 행을 Hbase에 넣는 예제입니다. 이 예제는 "Hbase : The Definitive Guide"의 예제이며 저장소에서 찾을 수 있습니다. 컴퓨터에서 복제본을 보려면 다음과 같이하십시오.

    이것은 파일에서 읽고 모든 행을 Hbase에 넣는 예제입니다. 이 예제는 "Hbase : The Definitive Guide"의 예제이며 저장소에서 찾을 수 있습니다. 컴퓨터에서 복제본을 보려면 다음과 같이하십시오.

    git clone git://github.com/larsgeorge/hbase-book.git
    

    이 책에서 코드에 대한 모든 설명을 찾을 수도 있습니다. 그러나 무언가가 이해할 수 없다면 언제든지 물어보십시오.

    `    public class ImportFromFile {
         public static final String NAME = "ImportFromFile"; 
         public enum Counters { LINES }
    
         static class ImportMapper
         extends Mapper<LongWritable, Text, ImmutableBytesWritable, Writable> { 
           private byte[] family = null;
           private byte[] qualifier = null;
    
           @Override
           protected void setup(Context context)
             throws IOException, InterruptedException {
             String column = context.getConfiguration().get("conf.column");
             byte[][] colkey = KeyValue.parseColumn(Bytes.toBytes(column));
             family = colkey[0];
             if (colkey.length > 1) {
               qualifier = colkey[1];
             }
           }
    
           @Override
           public void map(LongWritable offset, Text line, Context context) 
           throws IOException {
              try {
               String lineString = line.toString();
               byte[] rowkey = DigestUtils.md5(lineString); 
               Put put = new Put(rowkey);
               put.add(family, qualifier, Bytes.toBytes(lineString)); 
               context.write(new ImmutableBytesWritable(rowkey), put);
               context.getCounter(Counters.LINES).increment(1);
             } catch (Exception e) {
               e.printStackTrace();
             }
           }
         }
    
         private static CommandLine parseArgs(String[] args) throws ParseException { 
           Options options = new Options();
           Option o = new Option("t", "table", true,
             "table to import into (must exist)");
           o.setArgName("table-name");
           o.setRequired(true);
           options.addOption(o);
           o = new Option("c", "column", true,
             "column to store row data into (must exist)");
           o.setArgName("family:qualifier");
           o.setRequired(true);
           options.addOption(o);
           o = new Option("i", "input", true,
             "the directory or file to read from");
           o.setArgName("path-in-HDFS");
           o.setRequired(true);
           options.addOption(o);
           options.addOption("d", "debug", false, "switch on DEBUG log level");
           CommandLineParser parser = new PosixParser();
           CommandLine cmd = null;
           try {
             cmd = parser.parse(options, args);
           } catch (Exception e) {
             System.err.println("ERROR: " + e.getMessage() + "\n");
             HelpFormatter formatter = new HelpFormatter();
             formatter.printHelp(NAME + " ", options, true);
             System.exit(-1);
           }
           return cmd;
         }
    
         public static void main(String[] args) throws Exception {
           Configuration conf = HBaseConfiguration.create();
           String[] otherArgs =
             new GenericOptionsParser(conf, args).getRemainingArgs(); 
           CommandLine cmd = parseArgs(otherArgs);
           String table = cmd.getOptionValue("t");
           String input = cmd.getOptionValue("i");
           String column = cmd.getOptionValue("c");
           conf.set("conf.column", column);
           Job job = new Job(conf, "Import from file " + input + " into table " + table); 
    
                job.setJarByClass(ImportFromFile.class);
           job.setMapperClass(ImportMapper.class);
           job.setOutputFormatClass(TableOutputFormat.class);
           job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table);
           job.setOutputKeyClass(ImmutableBytesWritable.class);
           job.setOutputValueClass(Writable.class);
           job.setNumReduceTasks(0); 
           FileInputFormat.addInputPath(job, new Path(input));
           System.exit(job.waitForCompletion(true) ? 0 : 1);
         }
        }`
    
  2. ==============================

    2.매퍼 출력을 쌍으로 출력하면됩니다. OutputFormat은 출력 키 - 값을 유지하는 방법 만 지정합니다. 반드시 핵심 가치가 감속기에서 나온다는 의미는 아닙니다. 매퍼에서 다음과 같은 작업을 수행해야합니다.

    매퍼 출력을 쌍으로 출력하면됩니다. OutputFormat은 출력 키 - 값을 유지하는 방법 만 지정합니다. 반드시 핵심 가치가 감속기에서 나온다는 의미는 아닙니다. 매퍼에서 다음과 같은 작업을 수행해야합니다.

    ... extends TableMapper<ImmutableBytesWritable, Put>() {
        ...
        ...
        context.write(<some key>, <some Put or Delete object>);
    }
    
  3. from https://stackoverflow.com/questions/11061854/hadoop-writing-to-hbase-directly-from-the-mapper by cc-by-sa and MIT license