[HADOOP] Hadoop ChainMapper, ChainReducer [복제]
HADOOPHadoop ChainMapper, ChainReducer [복제]
저는 Hadoop에 비교적 익숙하지 않고 ChainMapper, ChainReducer를 사용하여 프로그래밍 방식으로 작업 (여러 매퍼, 축소 기)을 연결하는 방법을 알아 내려고합니다. 나는 몇 가지 부분적인 예제를 찾았지만 하나의 완전한 예제는 찾지 못했습니다.
현재 테스트 코드는 다음과 같습니다.
public class ChainJobs extends Configured implements Tool {
public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
output.collect(word, one);
}
}
}
public static class Map2 extends MapReduceBase implements Mapper<Text, IntWritable, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
public void map(Text key, IntWritable value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken().concat("Justatest"));
output.collect(word, one);
}
}
}
public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}
@Override
public int run(String[] args) {
Configuration conf = getConf();
JobConf job = new JobConf(conf);
job.setJobName("TestforChainJobs");
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
JobConf map1Conf = new JobConf(false);
ChainMapper.addMapper(job, Map.class, LongWritable.class, Text.class, Text.class, IntWritable.class, true, map1Conf);
JobConf map2Conf = new JobConf(false);
ChainMapper.addMapper(job, Map2.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, map2Conf);
JobConf reduceConf = new JobConf(false);
ChainReducer.setReducer(job, Reduce.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, reduceConf);
JobClient.runJob(job);
return 0;
}
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new ChainJobs(), args);
System.exit(res);
}
하지만
MapAttempt TASK_TYPE="MAP" TASKID="task_201210162337_0009_m_000000" TASK_ATTEMPT_ID="attempt_201210162337_0009_m_000000_0" TASK_STATUS="FAILED" FINISH_TIME="1350397216365" HOSTNAME="localhost\.localdomain" ERROR="java\.lang\.RuntimeException: Error in configuring object
at org\.apache\.hadoop\.util\.ReflectionUtils\.setJobConf(ReflectionUtils\.java:106)
at org\.apache\.hadoop\.util\.ReflectionUtils\.setConf(ReflectionUtils\.java:72)
at org\.apache\.hadoop\.util\.ReflectionUtils\.newInstance(ReflectionUtils\.java:130)
at org\.apache\.hadoop\.mapred\.MapTask\.runOldMapper(MapTask\.java:389)
at org\.apache\.hadoop\.mapred\.MapTask\.run(MapTask\.java:327)
at org\.apache\.hadoop\.mapred\.Child$4\.run(Child\.java:268)
at java\.security\.AccessController\.doPrivileged(Native Method)
at javax\.security\.auth\.Subject\.doAs(Subject\.java:396)
모든 힌트 또는 매우 간단한 작업 예제가 많이 감사합니다.
해결법
-
==============================
1.체인 매퍼 (chain mapper)를 기반으로 한 단어 수를 코딩했습니다. 이 코드는 새로운 API와 잘 작동하는 것으로 작성되었습니다. :)
체인 매퍼 (chain mapper)를 기반으로 한 단어 수를 코딩했습니다. 이 코드는 새로운 API와 잘 작동하는 것으로 작성되었습니다. :)
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.chain.ChainMapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; //implementing CHAIN MAPREDUCE without using custom format //SPLIT MAPPER class SplitMapper extends Mapper<Object,Text,Text,IntWritable> { private IntWritable dummyValue=new IntWritable(1); //private String content; private String tokens[]; @Override public void map(Object key,Text value,Context context)throws IOException,InterruptedException{ tokens=value.toString().split(" "); for(String x:tokens) { context.write(new Text(x), dummyValue); } } } //UPPER CASE MAPPER class UpperCaseMapper extends Mapper<Text,IntWritable,Text,IntWritable> { @Override public void map(Text key,IntWritable value,Context context)throws IOException,InterruptedException{ String val=key.toString().toUpperCase(); Text newKey=new Text(val); context.write(newKey, value); } } //ChainMapReducer class ChainMapReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private int sum=0; @Override public void reduce(Text key,Iterable<IntWritable>values,Context context)throws IOException,InterruptedException{ for(IntWritable value:values) { sum+=value.get(); } context.write(key, new IntWritable(sum)); } } public class FirstClass extends Configured implements Tool{ static Configuration cf; public int run (String args[])throws IOException,InterruptedException,ClassNotFoundException{ cf=new Configuration(); //bypassing the GenericOptionsParser part and directly running into job declaration part Job j=Job.getInstance(cf); /**************CHAIN MAPPER AREA STARTS********************************/ Configuration splitMapConfig=new Configuration(false); //below we add the 1st mapper class under ChainMapper Class ChainMapper.addMapper(j, SplitMapper.class, Object.class, Text.class, Text.class, IntWritable.class, splitMapConfig); //configuration for second mapper Configuration upperCaseConfig=new Configuration(false); //below we add the 2nd mapper that is the lower case mapper to the Chain Mapper class ChainMapper.addMapper(j, UpperCaseMapper.class, Text.class, IntWritable.class, Text.class, IntWritable.class, upperCaseConfig); /**************CHAIN MAPPER AREA FINISHES********************************/ //now proceeding with the normal delivery j.setJarByClass(FirstClass.class); j.setCombinerClass(ChainMapReducer.class); j.setOutputKeyClass(Text.class); j.setOutputValueClass(IntWritable.class); Path p=new Path(args[1]); //set the input and output URI FileInputFormat.addInputPath(j, new Path(args[0])); FileOutputFormat.setOutputPath(j, p); p.getFileSystem(cf).delete(p, true); return j.waitForCompletion(true)?0:1; } public static void main(String args[])throws Exception{ int res=ToolRunner.run(cf, new FirstClass(), args); System.exit(res); } }
출력의 일부가 아래에 표시되었습니다.
A 619 ACCORDING 636 ACCOUNT 638 ACROSS? 655 ADDRESSES 657 AFTER 674 AGGREGATING, 687 AGO, 704 ALL 721 ALMOST 755 ALTERING 768 AMOUNT 785 AN 819 ANATOMY 820 AND 1198 ANXIETY 1215 ANY 1232 APACHE 1300 APPENDING 1313 APPLICATIONS 1330 APPLICATIONS. 1347 APPLICATIONS.� 1364 APPLIES 1381 ARCHITECTURE, 1387 ARCHIVES 1388 ARE 1405 AS 1422 BASED 1439
구두점을 제거하기 위해 클렌징을 사용하지 않았으므로 특수 문자 또는 원치 않는 문자가 표시 될 수 있습니다. 나는 체인 매퍼 (chain mapper)의 작업에만 집중했다. 감사 :)
from https://stackoverflow.com/questions/12926474/hadoop-chainmapper-chainreducer by cc-by-sa and MIT license
'HADOOP' 카테고리의 다른 글
[HADOOP] mapreduce 작업의 맵 단계 출력은 항상 정렬됩니까? (0) | 2019.08.05 |
---|---|
[HADOOP] 다른 배치 기간으로 여러 Spark Streaming 작업을 어떻게 설정합니까? (0) | 2019.08.05 |
[HADOOP] 플러그인 저장소에 플러그인이 없습니다. - 회사 Nexus가 다운되었을 때 문제를 해결하는 방법은 무엇입니까? (0) | 2019.08.05 |
[HADOOP] 하이브는 스파크보다 빠릅니까? (0) | 2019.08.05 |
[HADOOP] oozie 작업 이름의 이름을 동적으로 바꿀 수 있습니까? (0) | 2019.08.05 |