복붙노트

[HADOOP] Hadoop ChainMapper, ChainReducer [복제]

HADOOP

Hadoop ChainMapper, ChainReducer [복제]

저는 Hadoop에 비교적 익숙하지 않고 ChainMapper, ChainReducer를 사용하여 프로그래밍 방식으로 작업 (여러 매퍼, 축소 기)을 연결하는 방법을 알아 내려고합니다. 나는 몇 가지 부분적인 예제를 찾았지만 하나의 완전한 예제는 찾지 못했습니다.

현재 테스트 코드는 다음과 같습니다.

public class ChainJobs extends Configured implements Tool {

public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line);
        while (tokenizer.hasMoreTokens()) {
            word.set(tokenizer.nextToken());
            output.collect(word, one);
        }
    }
}

public static class Map2 extends MapReduceBase implements Mapper<Text, IntWritable, Text, IntWritable> {

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    @Override
    public void map(Text key, IntWritable value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line);
        while (tokenizer.hasMoreTokens()) {
            word.set(tokenizer.nextToken().concat("Justatest"));
            output.collect(word, one);
        }
    }
}

public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {

    @Override
    public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
        int sum = 0;
        while (values.hasNext()) {
            sum += values.next().get();
        }
        output.collect(key, new IntWritable(sum));
    }
}

@Override
public int run(String[] args)  {

    Configuration conf = getConf();
    JobConf job = new JobConf(conf);

    job.setJobName("TestforChainJobs");
    FileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    JobConf map1Conf = new JobConf(false);
    ChainMapper.addMapper(job, Map.class, LongWritable.class, Text.class, Text.class, IntWritable.class, true, map1Conf);

    JobConf map2Conf = new JobConf(false);
    ChainMapper.addMapper(job, Map2.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, map2Conf);

    JobConf reduceConf = new JobConf(false);
    ChainReducer.setReducer(job, Reduce.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, reduceConf);

    JobClient.runJob(job);
    return 0;

     }

}

public static void main(String[] args) throws Exception {
    int res = ToolRunner.run(new Configuration(), new ChainJobs(), args);
    System.exit(res);
}

하지만

MapAttempt TASK_TYPE="MAP" TASKID="task_201210162337_0009_m_000000" TASK_ATTEMPT_ID="attempt_201210162337_0009_m_000000_0" TASK_STATUS="FAILED" FINISH_TIME="1350397216365" HOSTNAME="localhost\.localdomain" ERROR="java\.lang\.RuntimeException: Error in configuring object
    at org\.apache\.hadoop\.util\.ReflectionUtils\.setJobConf(ReflectionUtils\.java:106)
    at org\.apache\.hadoop\.util\.ReflectionUtils\.setConf(ReflectionUtils\.java:72)
    at org\.apache\.hadoop\.util\.ReflectionUtils\.newInstance(ReflectionUtils\.java:130)
    at org\.apache\.hadoop\.mapred\.MapTask\.runOldMapper(MapTask\.java:389)
    at org\.apache\.hadoop\.mapred\.MapTask\.run(MapTask\.java:327)
    at org\.apache\.hadoop\.mapred\.Child$4\.run(Child\.java:268)
    at java\.security\.AccessController\.doPrivileged(Native Method)
    at javax\.security\.auth\.Subject\.doAs(Subject\.java:396)

모든 힌트 또는 매우 간단한 작업 예제가 많이 감사합니다.

해결법

  1. ==============================

    1.체인 매퍼 (chain mapper)를 기반으로 한 단어 수를 코딩했습니다. 이 코드는 새로운 API와 잘 작동하는 것으로 작성되었습니다. :)

    체인 매퍼 (chain mapper)를 기반으로 한 단어 수를 코딩했습니다. 이 코드는 새로운 API와 잘 작동하는 것으로 작성되었습니다. :)

    import java.io.IOException;
    
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    //implementing CHAIN MAPREDUCE without using custom format
    
    
    
    
    //SPLIT MAPPER
    class SplitMapper extends Mapper<Object,Text,Text,IntWritable>
    {
        private IntWritable dummyValue=new IntWritable(1);
        //private String content;
        private String tokens[];
        @Override
        public void map(Object key,Text value,Context context)throws IOException,InterruptedException{
            tokens=value.toString().split(" ");
            for(String x:tokens)
            {
            context.write(new Text(x), dummyValue);
            }
        }   
    }
    
    
    
    
    //UPPER CASE MAPPER
    class UpperCaseMapper extends Mapper<Text,IntWritable,Text,IntWritable>
    {
        @Override
        public void map(Text key,IntWritable value,Context context)throws IOException,InterruptedException{
            String val=key.toString().toUpperCase();
            Text newKey=new Text(val);
            context.write(newKey, value);
        }
    }
    
    
    
    //ChainMapReducer
    class ChainMapReducer extends Reducer<Text,IntWritable,Text,IntWritable>
    {
        private int sum=0;
        @Override
        public void reduce(Text key,Iterable<IntWritable>values,Context context)throws IOException,InterruptedException{
            for(IntWritable value:values)
            {
                sum+=value.get();
            }
            context.write(key, new IntWritable(sum));
        }
    }
    public class FirstClass extends Configured implements Tool{
        static Configuration cf;
        public int run (String args[])throws IOException,InterruptedException,ClassNotFoundException{
            cf=new Configuration();
    
            //bypassing the GenericOptionsParser part and directly running into job declaration part
            Job j=Job.getInstance(cf);
    
            /**************CHAIN MAPPER AREA STARTS********************************/
            Configuration splitMapConfig=new Configuration(false);
            //below we add the 1st mapper class under ChainMapper Class
            ChainMapper.addMapper(j, SplitMapper.class, Object.class, Text.class, Text.class, IntWritable.class, splitMapConfig);
    
            //configuration for second mapper
            Configuration upperCaseConfig=new Configuration(false);
            //below we add the 2nd mapper that is the lower case mapper to the Chain Mapper class
            ChainMapper.addMapper(j, UpperCaseMapper.class, Text.class, IntWritable.class, Text.class, IntWritable.class, upperCaseConfig);
            /**************CHAIN MAPPER AREA FINISHES********************************/
    
            //now proceeding with the normal delivery
            j.setJarByClass(FirstClass.class);
            j.setCombinerClass(ChainMapReducer.class);
            j.setOutputKeyClass(Text.class);
            j.setOutputValueClass(IntWritable.class);
            Path p=new Path(args[1]);
    
            //set the input and output URI
            FileInputFormat.addInputPath(j, new Path(args[0]));
            FileOutputFormat.setOutputPath(j, p);
            p.getFileSystem(cf).delete(p, true);
            return j.waitForCompletion(true)?0:1;
        }
        public static void main(String args[])throws Exception{
            int res=ToolRunner.run(cf, new FirstClass(), args);
            System.exit(res);
        }
    }
    

    출력의 일부가 아래에 표시되었습니다.

    A       619
    ACCORDING       636
    ACCOUNT 638
    ACROSS? 655
    ADDRESSES       657
    AFTER   674
    AGGREGATING,    687
    AGO,    704
    ALL     721
    ALMOST  755
    ALTERING        768
    AMOUNT  785
    AN      819
    ANATOMY 820
    AND     1198
    ANXIETY 1215
    ANY     1232
    APACHE  1300
    APPENDING       1313
    APPLICATIONS    1330
    APPLICATIONS.   1347
    APPLICATIONS.�        1364
    APPLIES 1381
    ARCHITECTURE,   1387
    ARCHIVES        1388
    ARE     1405
    AS      1422
    BASED   1439
    

    구두점을 제거하기 위해 클렌징을 사용하지 않았으므로 특수 문자 또는 원치 않는 문자가 표시 될 수 있습니다. 나는 체인 매퍼 (chain mapper)의 작업에만 집중했다. 감사 :)

  2. from https://stackoverflow.com/questions/12926474/hadoop-chainmapper-chainreducer by cc-by-sa and MIT license