감속기에서 매퍼 카운터에 액세스

내 감속재에있는 내 매퍼에서 카운터에 액세스해야합니다. 이것이 가능한가? 그렇다면 어떻게해야할까요?

예로서: 내 매퍼입니다 :

public class CounterMapper extends Mapper<Text,Text,Text,Text> {

    static enum TestCounters { TEST }

    protected void map(Text key, Text value, Context context)
                    throws IOException, InterruptedException {
        context.write(key, value);

내 감속기는

public class CounterReducer extends Reducer<Text,Text,Text,LongWritable> {

    protected void reduce(Text key, Iterable<Text> values, Context context)
                        throws IOException, InterruptedException {
        Counter counter = context.getCounter(CounterMapper.TestCounters.TEST);
        long counterValue = counter.getValue();
        context.write(key, new LongWritable(counterValue));

counterValue는 항상 0입니다. 제가 잘못한 일을하고 있습니까? 그렇지 않습니다.


    1.감속기의 configure (JobConf)에서 JobConf 객체를 사용하여 감속기 자신의 작업 ID를 조회 할 수 있습니다. 그러면 감속기가 JobClient (예 : jobtracker에 대한 연결)를 만들고이 작업 (또는 해당 작업에 대한 작업)에 대한 카운터를 쿼리 할 수 ​​있습니다.

    // in the Reducer class...
    private long mapperCounter;
    public void configure(JobConf conf) {
        JobClient client = new JobClient(conf);
        RunningJob parentJob = 
            client.getJob(JobID.forName( conf.get("mapred.job.id") ));
        mapperCounter = parentJob.getCounters().getCounter(MAP_COUNTER_NAME);

    이제 reduce () 메소드 자체에서 mapperCounter를 사용할 수 있습니다.

    실제로 여기에서 try-catch가 필요합니다. 이전 API를 사용하고 있지만 새 API를 적용하기가 쉽지 않습니다.

    감속기가 시작되기 전에 매퍼의 카운터는 모두 마무리되어야한다는 점에 유의하십시오. 저스틴 토마스의 의견과 달리 정확한 값을 얻어야합니다 (감속기가 같은 카운터를 늘리지 않는 한!).

    2.새 API에 대한 Jeff G의 솔루션 구현 :

        public void setup(Context context) throws IOException, InterruptedException{
            Configuration conf = context.getConfiguration();
            Cluster cluster = new Cluster(conf);
            Job currentJob = cluster.getJob(context.getJobID());
            mapperCounter = currentJob.getCounters().findCounter(COUNTER_NAME).getValue();  
    3.map / reduce의 핵심은 작업을 병렬 처리하는 것입니다. 많은 맵퍼 / 리듀서가있을 것이므로 map / reduce 쌍의 실행을 제외하고는 값이 정확하지 않을 것입니다.

    단어 수를 예로들 수 있습니다.


    context.write (word, one)를 context.write (line, one)로 변경할 수 있습니다.

    4.전역 카운터 값은 각 매퍼 또는 감속기로 다시 브로드 캐스트되지 않습니다. 감속기에 # 매퍼 레코드를 사용할 수있게하려면이 작업을 수행하기 위해 외부 메커니즘을 사용해야합니다.

    5.나는이 질문을했지만 내 문제는 해결하지 못했다. 그러나 대체 솔루션이 내 마음에 왔습니다. 매퍼에서 단어 수를 계산하고 매퍼의 끝을 실행하는 정리 함수에서 최소값 키 (이 값이 머리에 있음)로 중간 출력에 쓸 수 있습니다. 감속기에서 단어의 수는 머리에 값을 추가하여 계산됩니다. 샘플 코드와 그 출력물의 일부는 아래에 있습니다.

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import java.io.IOException;
    import java.util.StringTokenizer;
     * Created by tolga on 1/26/16.
    public class WordCount {
        static enum TestCounters { TEST }
        public static class Map extends Mapper<Object, Text, Text, LongWritable> {
            private final static LongWritable one = new LongWritable(1);
            private Text word = new Text();
            public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
                String line = value.toString();
                StringTokenizer tokenizer = new StringTokenizer(line);
                while (tokenizer.hasMoreTokens()) {
                    context.write(word, one);
            protected void cleanup(Context context) throws IOException, InterruptedException {
                context.write(new Text("!"),new LongWritable(context.getCounter(TestCounters.TEST).getValue()));
        public static class Reduce extends Reducer<Text, LongWritable, Text, LongWritable> {
            public void reduce(Text key, Iterable<LongWritable> values, Context context)
                    throws IOException, InterruptedException {
                int sum = 0;
                for (LongWritable val : values) {
                    sum += val.get();
                context.write(key, new LongWritable(sum));
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            Job job = new Job(conf, "WordCount");
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));

    텍스트 파일 :

    Turgut Özal University is a private university located in Ankara, Turkey. It was established in 2008 by the Turgut Özal Thought and Action Foundation and is named after former Turkish president Turgut Özal.

    중간 출력

    **! 33 ** 2008 (1) 조치 1 앙카라, 1 재단 1 그것은 1 일을 생각 Turgut 1 Turgut 1 Turgut 1

    **! 33 ** 2008 1 액션 1 앙카라, 1 파운데이션 1 그것 1 생각 1 투르 구트 3

    6.findCounter (COUNTER_NAME)은 (는) 더 이상 지원되지 않습니다. - https://hadoop.apache.org/docs/r2.7.0/api/org/apache/hadoop/mapred/Counters.html

    public void setup(Context context) throws IOException, InterruptedException{
        Configuration conf = context.getConfiguration();
        Cluster cluster = new Cluster(conf);
        Job currentJob = cluster.getJob(context.getJobID());
        mapperCounter = currentJob.getCounters().findCounter(GROUP_NAME, COUNTER_NAME).getValue();  

    카운터가 호출 될 때 GROUP_NAME이 (가) 지정됩니다. 예 :

    context.getCounter("com.example.mycode", "MY_COUNTER").increment(1);


    mapperCounter = currentJob.getCounters().findCounter("com.example.mycode", "MY_COUNTER").getValue();  

    또한 중요한 점은 카운터가 존재하지 않으면 값이 0 인 카운터를 초기화한다는 것입니다.

