복붙노트

[HADOOP] 감속기에서 매퍼 카운터에 액세스

HADOOP

감속기에서 매퍼 카운터에 액세스

내 감속재에있는 내 매퍼에서 카운터에 액세스해야합니다. 이것이 가능한가? 그렇다면 어떻게해야할까요?

예로서: 내 매퍼입니다 :

public class CounterMapper extends Mapper<Text,Text,Text,Text> {

    static enum TestCounters { TEST }

    @Override
    protected void map(Text key, Text value, Context context)
                    throws IOException, InterruptedException {
        context.getCounter(TestCounters.TEST).increment(1);
        context.write(key, value);
    }
}

내 감속기는

public class CounterReducer extends Reducer<Text,Text,Text,LongWritable> {

    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context)
                        throws IOException, InterruptedException {
        Counter counter = context.getCounter(CounterMapper.TestCounters.TEST);
        long counterValue = counter.getValue();
        context.write(key, new LongWritable(counterValue));
    }
}

counterValue는 항상 0입니다. 제가 잘못한 일을하고 있습니까? 그렇지 않습니다.

해결법

  1. ==============================

    1.감속기의 configure (JobConf)에서 JobConf 객체를 사용하여 감속기 자신의 작업 ID를 조회 할 수 있습니다. 그러면 감속기가 JobClient (예 : jobtracker에 대한 연결)를 만들고이 작업 (또는 해당 작업에 대한 작업)에 대한 카운터를 쿼리 할 수 ​​있습니다.

    감속기의 configure (JobConf)에서 JobConf 객체를 사용하여 감속기 자신의 작업 ID를 조회 할 수 있습니다. 그러면 감속기가 JobClient (예 : jobtracker에 대한 연결)를 만들고이 작업 (또는 해당 작업에 대한 작업)에 대한 카운터를 쿼리 할 수 ​​있습니다.

    // in the Reducer class...
    private long mapperCounter;
    
    @Override
    public void configure(JobConf conf) {
        JobClient client = new JobClient(conf);
        RunningJob parentJob = 
            client.getJob(JobID.forName( conf.get("mapred.job.id") ));
        mapperCounter = parentJob.getCounters().getCounter(MAP_COUNTER_NAME);
    }
    

    이제 reduce () 메소드 자체에서 mapperCounter를 사용할 수 있습니다.

    실제로 여기에서 try-catch가 필요합니다. 이전 API를 사용하고 있지만 새 API를 적용하기가 쉽지 않습니다.

    감속기가 시작되기 전에 매퍼의 카운터는 모두 마무리되어야한다는 점에 유의하십시오. 저스틴 토마스의 의견과 달리 정확한 값을 얻어야합니다 (감속기가 같은 카운터를 늘리지 않는 한!).

  2. ==============================

    2.새 API에 대한 Jeff G의 솔루션 구현 :

    새 API에 대한 Jeff G의 솔루션 구현 :

        @Override
        public void setup(Context context) throws IOException, InterruptedException{
            Configuration conf = context.getConfiguration();
            Cluster cluster = new Cluster(conf);
            Job currentJob = cluster.getJob(context.getJobID());
            mapperCounter = currentJob.getCounters().findCounter(COUNTER_NAME).getValue();  
        }
    
  3. ==============================

    3.map / reduce의 핵심은 작업을 병렬 처리하는 것입니다. 많은 맵퍼 / 리듀서가있을 것이므로 map / reduce 쌍의 실행을 제외하고는 값이 정확하지 않을 것입니다.

    map / reduce의 핵심은 작업을 병렬 처리하는 것입니다. 많은 맵퍼 / 리듀서가있을 것이므로 map / reduce 쌍의 실행을 제외하고는 값이 정확하지 않을 것입니다.

    단어 수를 예로들 수 있습니다.

    http://wiki.apache.org/hadoop/WordCount

    context.write (word, one)를 context.write (line, one)로 변경할 수 있습니다.

  4. ==============================

    4.전역 카운터 값은 각 매퍼 또는 감속기로 다시 브로드 캐스트되지 않습니다. 감속기에 # 매퍼 레코드를 사용할 수있게하려면이 작업을 수행하기 위해 외부 메커니즘을 사용해야합니다.

    전역 카운터 값은 각 매퍼 또는 감속기로 다시 브로드 캐스트되지 않습니다. 감속기에 # 매퍼 레코드를 사용할 수있게하려면이 작업을 수행하기 위해 외부 메커니즘을 사용해야합니다.

  5. ==============================

    5.나는이 질문을했지만 내 문제는 해결하지 못했다. 그러나 대체 솔루션이 내 마음에 왔습니다. 매퍼에서 단어 수를 계산하고 매퍼의 끝을 실행하는 정리 함수에서 최소값 키 (이 값이 머리에 있음)로 중간 출력에 쓸 수 있습니다. 감속기에서 단어의 수는 머리에 값을 추가하여 계산됩니다. 샘플 코드와 그 출력물의 일부는 아래에 있습니다.

    나는이 질문을했지만 내 문제는 해결하지 못했다. 그러나 대체 솔루션이 내 마음에 왔습니다. 매퍼에서 단어 수를 계산하고 매퍼의 끝을 실행하는 정리 함수에서 최소값 키 (이 값이 머리에 있음)로 중간 출력에 쓸 수 있습니다. 감속기에서 단어의 수는 머리에 값을 추가하여 계산됩니다. 샘플 코드와 그 출력물의 일부는 아래에 있습니다.

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    
    import java.io.IOException;
    import java.util.StringTokenizer;
    
    /**
     * Created by tolga on 1/26/16.
     */
    public class WordCount {
        static enum TestCounters { TEST }
        public static class Map extends Mapper<Object, Text, Text, LongWritable> {
            private final static LongWritable one = new LongWritable(1);
            private Text word = new Text();
    
            public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
                String line = value.toString();
                StringTokenizer tokenizer = new StringTokenizer(line);
                while (tokenizer.hasMoreTokens()) {
                    word.set(tokenizer.nextToken());
                    context.write(word, one);
                    context.getCounter(TestCounters.TEST).increment(1);
                }
            }
    
            @Override
            protected void cleanup(Context context) throws IOException, InterruptedException {
                context.write(new Text("!"),new LongWritable(context.getCounter(TestCounters.TEST).getValue()));
            }
        }
    
        public static class Reduce extends Reducer<Text, LongWritable, Text, LongWritable> {
    
            public void reduce(Text key, Iterable<LongWritable> values, Context context)
                    throws IOException, InterruptedException {
                int sum = 0;
                for (LongWritable val : values) {
                    sum += val.get();
                }
                context.write(key, new LongWritable(sum));
            }
        }
    
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
    
            Job job = new Job(conf, "WordCount");
            job.setJarByClass(WordCount.class);
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(LongWritable.class);
    
            job.setMapperClass(Map.class);
            job.setReducerClass(Reduce.class);
    
            job.setInputFormatClass(TextInputFormat.class);
            job.setOutputFormatClass(TextOutputFormat.class);
    
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
            job.waitForCompletion(true);
        }
    }
    

    텍스트 파일 :

    Turgut Özal University is a private university located in Ankara, Turkey. It was established in 2008 by the Turgut Özal Thought and Action Foundation and is named after former Turkish president Turgut Özal.
    

    중간 출력

    **! 33 ** 2008 (1) 조치 1 앙카라, 1 재단 1 그것은 1 일을 생각 Turgut 1 Turgut 1 Turgut 1

    **! 33 ** 2008 1 액션 1 앙카라, 1 파운데이션 1 그것 1 생각 1 투르 구트 3

  6. ==============================

    6.findCounter (COUNTER_NAME)은 (는) 더 이상 지원되지 않습니다. - https://hadoop.apache.org/docs/r2.7.0/api/org/apache/hadoop/mapred/Counters.html

    findCounter (COUNTER_NAME)은 (는) 더 이상 지원되지 않습니다. - https://hadoop.apache.org/docs/r2.7.0/api/org/apache/hadoop/mapred/Counters.html

    @Override
    public void setup(Context context) throws IOException, InterruptedException{
        Configuration conf = context.getConfiguration();
        Cluster cluster = new Cluster(conf);
        Job currentJob = cluster.getJob(context.getJobID());
        mapperCounter = currentJob.getCounters().findCounter(GROUP_NAME, COUNTER_NAME).getValue();  
    }
    

    카운터가 호출 될 때 GROUP_NAME이 (가) 지정됩니다. 예 :

    context.getCounter("com.example.mycode", "MY_COUNTER").increment(1);
    

    그때

    mapperCounter = currentJob.getCounters().findCounter("com.example.mycode", "MY_COUNTER").getValue();  
    

    또한 중요한 점은 카운터가 존재하지 않으면 값이 0 인 카운터를 초기화한다는 것입니다.

  7. from https://stackoverflow.com/questions/5450290/accessing-a-mappers-counter-from-a-reducer by cc-by-sa and MIT license