복붙노트

[HADOOP] Hadoop 다중 입력

HADOOP

Hadoop 다중 입력

hadoop map reduce를 사용하고 있으며 두 개의 파일을 계산하려고합니다. 첫 번째 Map / Reduce 반복은 다음과 같이 쌍 ID 번호가있는 파일을 제공합니다.

A 30
D 20

내 목표는 파일에서 ID를 사용하여 다른 파일과 연결하고 다음과 같이 ID, 숫자, 이름과 같은 다른 출력을 트리오로 사용하는 것입니다.

A ABC 30
D EFGH 20

그러나 Map Reduce를 사용하는 것이 최선의 방법인지 확실하지 않습니다. 예를 들어 파일 판독기를 사용하여 두 번째 입력 파일을 읽고 ID로 이름을 얻는 것이 더 낫지 않습니까? Map Reduce로 할 수 있습니까?

그렇다면, 나는 어떻게 발견하려고 노력하고있다. 나는 MultipleInput 솔루션을 시도했다 :

MultipleInputs.addInputPath(job2, new Path(args[1]+"-tmp"),
    TextInputFormat.class, FlightsByCarrierMapper2.class);
MultipleInputs.addInputPath(job2, new Path("inputplanes"),
    TextInputFormat.class, FlightsModeMapper.class); 

그러나 두 가지를 결합하여 원하는 출력을 얻는 방법은 생각할 수 없습니다. 지금 당장 가지고있는 방법은이 예제와 같은 목록을 나에게 줄뿐입니다.

A ABC
A 30
B ABCD
C ABCDEF
D EFGH
D 20

나의 마지막 감속 후에 나는 이것을 얻고있다 :

N125DL  767-332
N125DL  7   , 
N126AT  737-76N
N126AT  19  , 
N126DL  767-332
N126DL  1   , 
N127DL  767-332
N127DL  7   , 
N128DL  767-332
N128DL  3

나는 이것을 원한다 : N127DL 7 767-332. 또한, 나는 결합하지 않는 것을 원하지 않습니다.

그리고 이것은 제 수업입니다.

공용 클래스 FlightsByCarrierReducer2는 감속기를 확장합니다 {

String merge = "";
protected void reduce(Text token, Iterable<Text> values, Context context) 
                            throws IOException, InterruptedException {

    int i = 0;  
    for(Text value:values)
    {
        if(i == 0){
            merge = value.toString()+",";
        }
        else{
            merge += value.toString();
        }
        i++;
    }

        context.write(token, new Text(merge));

}

}

최신 정보:

http://stat-computing.org/dataexpo/2009/the-data.html 이것이 제가 사용하고있는 예입니다.

나는 노력하고있다 : TailNum과 Cancelled는 (1 또는 0) TailNum에 해당하는 모델 이름을 얻는다. 모델이있는 파일에는 TailNumb, Model 및 기타 항목이 있습니다. 내 현재 출력 :

N193JB ERJ 190-100 IGW

N194DN 767-332

N19503 EMB-135ER

N19554 EMB-145LR

N195DN 767-332

N195DN 2

먼저 열쇠가오고, 두 번째 모델은 비행이 취소 된 열쇠이며 모델 아래에 나타납니다.

그리고 모델 당 취소 수를 원하기 때문에 취소 된 모델 번호, 트리오 키를 원합니다.

해결법

  1. ==============================

    1.두 맵퍼의 키로 ID를 사용하여 이들을 조인 할 수 있습니다. 지도 작업을 다음과 같이 작성할 수 있습니다.

    두 맵퍼의 키로 ID를 사용하여 이들을 조인 할 수 있습니다. 지도 작업을 다음과 같이 작성할 수 있습니다.

    public void map(LongWritable k, Text value, Context context) throws IOException, InterruptedException
    {
        //Get the line
        //split the line to get ID seperate
        //word1 = A 
        //word2 = 30
                    //Likewise for A ABC
                       //word1 = A 
                      //word2 = ABC
        context.write(word1, word2);
    }
    

    나는 당신이 같은지도 작업을 취소 할 수 있다고 생각합니다. 그런 다음 Hadoop Framework가 핵심 기반으로 데이터를 그룹화하는 Reducer 작업을 작성하십시오. 그러면 ID를 키로 사용할 수 있습니다. 그리고 값 중 하나를 캐쉬 할 수 있습니다.

    String merge = "";
    public void reduce(Text key, Iterable<Text> values, Context context)
    {
        int i =0;
        for(Text value:values)
        {
            if(i == 0){
                merge = value.toString()+",";
            }
            else{
                merge += value.toString();
            }
            i++;
        }
        valEmit.set(merge);
        context.write(key, valEmit);
    }
    

    마지막으로 Driver 클래스를 작성할 수 있습니다.

    public int run(String[] args) throws Exception {
     Configuration c=new Configuration();
     String[] files=new GenericOptionsParser(c,args).getRemainingArgs();
     Path p1=new Path(files[0]);
     Path p2=new Path(files[1]);
     Path p3=new Path(files[2]);
     FileSystem fs = FileSystem.get(c);
     if(fs.exists(p3)){
      fs.delete(p3, true);
      }
     Job job = new Job(c,"Multiple Job");
     job.setJarByClass(MultipleFiles.class);
     MultipleInputs.addInputPath(job, p1, TextInputFormat.class, MultipleMap1.class);
     MultipleInputs.addInputPath(job,p2, TextInputFormat.class, MultipleMap2.class);
     job.setReducerClass(MultipleReducer.class);
     .
     .
    }
    

    여기에서 예제를 찾을 수 있습니다.

    희망이 도움이됩니다.

    최신 정보

    입력 1

    A 30
    D 20
    

    입력 2

    A ABC
    D EFGH
    

    산출

    A ABC 30
    D EFGH 20
    

    Mapper.java

    import java.io.IOException;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    /**
     * @author sreeveni
     *
     */
    public class Mapper1 extends Mapper<LongWritable, Text, Text, Text> {
        Text keyEmit = new Text();
        Text valEmit = new Text();
    
        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String line = value.toString();
            String parts[] = line.split(" ");
            keyEmit.set(parts[0]);
            valEmit.set(parts[1]);
            context.write(keyEmit, valEmit);
        }
    }
    

    Reducer.java

    import java.io.IOException;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    /**
     * @author sreeveni
     *
     */
    public class ReducerJoin extends Reducer<Text, Text, Text, Text> {
    
        Text valEmit = new Text();
        String merge = "";
    
        public void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            String character = "";
            String number = "";
            for (Text value : values) {
                // ordering output
                String val = value.toString();
                char myChar = val.charAt(0);
    
                if (Character.isDigit(myChar)) {
                    number = val;
                } else {
                    character = val;
                }
            }
            merge = character + " " + number;
            valEmit.set(merge);
            context.write(key, valEmit);
        }
    
    }
    

    드라이버 클래스

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    /**
     * @author sreeveni
     *
     */
    public class Driver extends Configured implements Tool {
        public static void main(String[] args) throws Exception {
            // TODO Auto-generated method stub
            // checking the arguments count
    
            if (args.length != 3) {
                System.err
                        .println("Usage : <inputlocation>  <inputlocation>  <outputlocation> ");
                System.exit(0);
            }
            int res = ToolRunner.run(new Configuration(), new Driver(), args);
            System.exit(res);
    
        }
    
        @Override
        public int run(String[] args) throws Exception {
            // TODO Auto-generated method stub
            String source1 = args[0];
            String source2 = args[1];
            String dest = args[2];
            Configuration conf = new Configuration();
            conf.set("mapred.textoutputformat.separator", " "); // changing default
                                                                // delimiter to user
                                                                // input delimiter
            FileSystem fs = FileSystem.get(conf);
            Job job = new Job(conf, "Multiple Jobs");
    
            job.setJarByClass(Driver.class);
            Path p1 = new Path(source1);
            Path p2 = new Path(source2);
            Path out = new Path(dest);
            MultipleInputs.addInputPath(job, p1, TextInputFormat.class,
                    Mapper1.class);
            MultipleInputs.addInputPath(job, p2, TextInputFormat.class,
                    Mapper1.class);
            job.setReducerClass(ReducerJoin.class);
    
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
    
            job.setOutputFormatClass(TextOutputFormat.class);
    
            /*
             * delete if exist
             */
            if (fs.exists(out))
                fs.delete(out, true);
    
            TextOutputFormat.setOutputPath(job, out);
            boolean success = job.waitForCompletion(true);
    
            return success ? 0 : 1;
        }
    
    }
    
  2. ==============================

    2.귀하의 감속기는지도 방법을 가지고 있지만, 합병하는 값의 반복 가능한 집합을 취하는 reduce 방법을 가져야합니다. reduce () 메서드가 없기 때문에 모든 키 / 값 쌍을 통과하는 기본 비헤이비어를 얻습니다.

    귀하의 감속기는지도 방법을 가지고 있지만, 합병하는 값의 반복 가능한 집합을 취하는 reduce 방법을 가져야합니다. reduce () 메서드가 없기 때문에 모든 키 / 값 쌍을 통과하는 기본 비헤이비어를 얻습니다.

  3. from https://stackoverflow.com/questions/27349743/hadoop-multiple-inputs by cc-by-sa and MIT license