복붙노트

[HADOOP] hdfs 파일에서 mapreduce를 실행하고 감속기 결과를 hbase 테이블에 저장하는 예

HADOOP

hdfs 파일에서 mapreduce를 실행하고 감속기 결과를 hbase 테이블에 저장하는 예

누군가 Hbase와 mapreduce를위한 좋은 예제 링크를 제공 할 수 있습니까? 내 요구 사항은 hdfs 파일에서 mapreduce를 실행하고 감속기 출력을 hbase 테이블에 저장합니다. 매퍼 입력은 hdfs 파일이되고 출력은 Text, IntWritable 키 값 쌍이됩니다. 감속기 출력은 Put 객체입니다. 즉 감속기 Iterable IntWritable 값을 추가하고 hbase 테이블에 저장합니다.

해결법

  1. ==============================

    1.다음은 문제를 해결할 코드입니다.

    다음은 문제를 해결할 코드입니다.

    HBaseConfiguration conf =  HBaseConfiguration.create();
    Job job = new Job(conf,"JOB_NAME");
        job.setJarByClass(yourclass.class);
        job.setMapperClass(yourMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Intwritable.class);
        FileInputFormat.setInputPaths(job, new Path(inputPath));
        TableMapReduceUtil.initTableReducerJob(TABLE,
                yourReducer.class, job);
        job.setReducerClass(yourReducer.class);
                job.waitForCompletion(true);
    
    class yourMapper extends Mapper<LongWritable, Text, Text,IntWritable> {
    //@overide map()
     }
    
    class yourReducer
            extends
            TableReducer<Text, IntWritable, 
            ImmutableBytesWritable>
    {
    //@override rdeuce()
    }
    
  2. ==============================

    2.** Phoenix Hbase에서 나에게 잘 맞는 아래 코드를 확인하고 map reduce **

    ** Phoenix Hbase에서 나에게 잘 맞는 아래 코드를 확인하고 map reduce **

    이 프로그램은 Hbase 테이블에서 데이터를 읽고 map-reduce 작업 후 다른 테이블로 결과를 삽입합니다.

    테이블 :-> STOCK, STOCK_STATS

    StockComputationJob.java

    public static class StockMapper extends Mapper<NullWritable, StockWritable, Text , DoubleWritable> {
    
        private Text stock = new Text(); 
        private DoubleWritable price = new DoubleWritable ();
    
        @Override
        protected void map(NullWritable key, StockWritable stockWritable, Context context) throws IOException, InterruptedException {
           double[] recordings = stockWritable.getRecordings();
           final String stockName = stockWritable.getStockName();
           System.out.println("Map-"+recordings);
           double maxPrice = Double.MIN_VALUE;
           for(double recording : recordings) {
               System.out.println("M-"+key+"-"+recording);
             if(maxPrice < recording) {
              maxPrice = recording;
                 }
           }
           System.out.println(stockName+"--"+maxPrice);
           stock.set(stockName);
           price.set(maxPrice);
           context.write(stock,price);
        }
    
    }
    
        public static void main(String[] args) throws Exception {
    
             final Configuration conf = new Configuration();
             HBaseConfiguration.addHbaseResources(conf);
             conf.set(HConstants.ZOOKEEPER_QUORUM, zkUrl);
             final Job job = Job.getInstance(conf, "stock-stats-job");
          // We can either specify a selectQuery or ignore it when we would like to retrieve all the columns
             final String selectQuery = "SELECT STOCK_NAME,RECORDING_YEAR,RECORDINGS_QUARTER FROM STOCK ";
    
             // StockWritable is the DBWritable class that enables us to process the Result of the above query
             PhoenixMapReduceUtil.setInput(job,StockWritable.class,"STOCK",selectQuery);  
    
             // Set the target Phoenix table and the columns
             PhoenixMapReduceUtil.setOutput(job, "STOCK_STATS", "STOCK_NAME,MAX_RECORDING");
    
             job.setMapperClass(StockMapper.class);
             job.setReducerClass(StockReducer.class); 
             job.setOutputFormatClass(PhoenixOutputFormat.class);
    
             job.setMapOutputKeyClass(Text.class);
             job.setMapOutputValueClass(DoubleWritable.class);
             job.setOutputKeyClass(NullWritable.class);
             job.setOutputValueClass(StockWritable.class); 
             TableMapReduceUtil.addDependencyJars(job);
             job.waitForCompletion(true);
         }
    
    }
    

    StockReducer.java

        public class StockReducer extends Reducer<Text, DoubleWritable, NullWritable , StockWritable> {
    
         protected void reduce(Text key, Iterable<DoubleWritable> recordings, Context context) throws IOException, InterruptedException {
              double maxPrice = Double.MIN_VALUE;
              System.out.println(recordings);
              for(DoubleWritable recording : recordings) {
                  System.out.println("R-"+key+"-"+recording);
                if(maxPrice < recording.get()) {
                 maxPrice = recording.get(); 
                }
              } 
              final StockWritable stock = new StockWritable();
              stock.setStockName(key.toString());
              stock.setMaxPrice(maxPrice);
              System.out.println(key+"--"+maxPrice);
              context.write(NullWritable.get(),stock);
            }
    
    
    }
    

    StockWritable.java

    public class StockWritable  implements DBWritable,Writable {
    
          private String stockName;
    
            private int year;
    
            private double[] recordings;
    
            private double maxPrice;   
    
            public void readFields(DataInput input) throws IOException {
    
            }
    
            public void write(DataOutput output) throws IOException {
    
            }
    
            public void readFields(ResultSet rs) throws SQLException {
               stockName = rs.getString("STOCK_NAME");
               setYear(rs.getInt("RECORDING_YEAR"));
               final Array recordingsArray = rs.getArray("RECORDINGS_QUARTER");
               setRecordings((double[])recordingsArray.getArray());
            }
    
            public void write(PreparedStatement pstmt) throws SQLException {
               pstmt.setString(1, stockName);
               pstmt.setDouble(2, maxPrice); 
            }
    
            public int getYear() {
                return year;
            }
    
            public void setYear(int year) {
                this.year = year;
            }
    
            public double[] getRecordings() {
                return recordings;
            }
    
            public void setRecordings(double[] recordings) {
                this.recordings = recordings;
            }
    
            public double getMaxPrice() {
                return maxPrice;
            }
    
            public void setMaxPrice(double maxPrice) {
                this.maxPrice = maxPrice;
            }
    
            public String getStockName() {
                return stockName;
            }
    
            public void setStockName(String stockName) {
                this.stockName = stockName;
            }
    
    
    }
    
  3. from https://stackoverflow.com/questions/13578097/example-for-running-mapreduce-on-hdfs-files-and-storing-reducer-results-in-hbase by cc-by-sa and MIT license