[HADOOP] hdfs 파일에서 mapreduce를 실행하고 감속기 결과를 hbase 테이블에 저장하는 예
HADOOPhdfs 파일에서 mapreduce를 실행하고 감속기 결과를 hbase 테이블에 저장하는 예
누군가 Hbase와 mapreduce를위한 좋은 예제 링크를 제공 할 수 있습니까? 내 요구 사항은 hdfs 파일에서 mapreduce를 실행하고 감속기 출력을 hbase 테이블에 저장합니다. 매퍼 입력은 hdfs 파일이되고 출력은 Text, IntWritable 키 값 쌍이됩니다. 감속기 출력은 Put 객체입니다. 즉 감속기 Iterable IntWritable 값을 추가하고 hbase 테이블에 저장합니다.
해결법
-
==============================
1.다음은 문제를 해결할 코드입니다.
다음은 문제를 해결할 코드입니다.
HBaseConfiguration conf = HBaseConfiguration.create(); Job job = new Job(conf,"JOB_NAME"); job.setJarByClass(yourclass.class); job.setMapperClass(yourMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Intwritable.class); FileInputFormat.setInputPaths(job, new Path(inputPath)); TableMapReduceUtil.initTableReducerJob(TABLE, yourReducer.class, job); job.setReducerClass(yourReducer.class); job.waitForCompletion(true);
class yourMapper extends Mapper<LongWritable, Text, Text,IntWritable> { //@overide map() }
class yourReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> { //@override rdeuce() }
-
==============================
2.** Phoenix Hbase에서 나에게 잘 맞는 아래 코드를 확인하고 map reduce **
** Phoenix Hbase에서 나에게 잘 맞는 아래 코드를 확인하고 map reduce **
이 프로그램은 Hbase 테이블에서 데이터를 읽고 map-reduce 작업 후 다른 테이블로 결과를 삽입합니다.
테이블 :-> STOCK, STOCK_STATS
StockComputationJob.java
public static class StockMapper extends Mapper<NullWritable, StockWritable, Text , DoubleWritable> { private Text stock = new Text(); private DoubleWritable price = new DoubleWritable (); @Override protected void map(NullWritable key, StockWritable stockWritable, Context context) throws IOException, InterruptedException { double[] recordings = stockWritable.getRecordings(); final String stockName = stockWritable.getStockName(); System.out.println("Map-"+recordings); double maxPrice = Double.MIN_VALUE; for(double recording : recordings) { System.out.println("M-"+key+"-"+recording); if(maxPrice < recording) { maxPrice = recording; } } System.out.println(stockName+"--"+maxPrice); stock.set(stockName); price.set(maxPrice); context.write(stock,price); } } public static void main(String[] args) throws Exception { final Configuration conf = new Configuration(); HBaseConfiguration.addHbaseResources(conf); conf.set(HConstants.ZOOKEEPER_QUORUM, zkUrl); final Job job = Job.getInstance(conf, "stock-stats-job"); // We can either specify a selectQuery or ignore it when we would like to retrieve all the columns final String selectQuery = "SELECT STOCK_NAME,RECORDING_YEAR,RECORDINGS_QUARTER FROM STOCK "; // StockWritable is the DBWritable class that enables us to process the Result of the above query PhoenixMapReduceUtil.setInput(job,StockWritable.class,"STOCK",selectQuery); // Set the target Phoenix table and the columns PhoenixMapReduceUtil.setOutput(job, "STOCK_STATS", "STOCK_NAME,MAX_RECORDING"); job.setMapperClass(StockMapper.class); job.setReducerClass(StockReducer.class); job.setOutputFormatClass(PhoenixOutputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(DoubleWritable.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(StockWritable.class); TableMapReduceUtil.addDependencyJars(job); job.waitForCompletion(true); } }
StockReducer.java
public class StockReducer extends Reducer<Text, DoubleWritable, NullWritable , StockWritable> { protected void reduce(Text key, Iterable<DoubleWritable> recordings, Context context) throws IOException, InterruptedException { double maxPrice = Double.MIN_VALUE; System.out.println(recordings); for(DoubleWritable recording : recordings) { System.out.println("R-"+key+"-"+recording); if(maxPrice < recording.get()) { maxPrice = recording.get(); } } final StockWritable stock = new StockWritable(); stock.setStockName(key.toString()); stock.setMaxPrice(maxPrice); System.out.println(key+"--"+maxPrice); context.write(NullWritable.get(),stock); } }
StockWritable.java
public class StockWritable implements DBWritable,Writable { private String stockName; private int year; private double[] recordings; private double maxPrice; public void readFields(DataInput input) throws IOException { } public void write(DataOutput output) throws IOException { } public void readFields(ResultSet rs) throws SQLException { stockName = rs.getString("STOCK_NAME"); setYear(rs.getInt("RECORDING_YEAR")); final Array recordingsArray = rs.getArray("RECORDINGS_QUARTER"); setRecordings((double[])recordingsArray.getArray()); } public void write(PreparedStatement pstmt) throws SQLException { pstmt.setString(1, stockName); pstmt.setDouble(2, maxPrice); } public int getYear() { return year; } public void setYear(int year) { this.year = year; } public double[] getRecordings() { return recordings; } public void setRecordings(double[] recordings) { this.recordings = recordings; } public double getMaxPrice() { return maxPrice; } public void setMaxPrice(double maxPrice) { this.maxPrice = maxPrice; } public String getStockName() { return stockName; } public void setStockName(String stockName) { this.stockName = stockName; } }
from https://stackoverflow.com/questions/13578097/example-for-running-mapreduce-on-hdfs-files-and-storing-reducer-results-in-hbase by cc-by-sa and MIT license
'HADOOP' 카테고리의 다른 글
[HADOOP] 매퍼의 단일 출력에서 여러 감속기를 실행 (0) | 2019.09.16 |
---|---|
[HADOOP] 예상 org.apache.hadoop.io.Text를 해결하는 방법, mapreduce 작업에서 org.apache.hadoop.io.LongWritable을 받았습니다. (0) | 2019.09.16 |
[HADOOP] mvn 및 make 패키지 오류 (0) | 2019.09.16 |
[HADOOP] Oozie 워크 플로우 단축 (0) | 2019.09.16 |
[HADOOP] Hive에 항아리를 포함하는 방법 (Amazon Hadoop env) (0) | 2019.09.16 |