복붙노트

[HADOOP] 카운터 감속기 코드에서 작동하지 않습니다

HADOOP

카운터 감속기 코드에서 작동하지 않습니다

나는 큰 하둡 프로젝트를 진행하고 내가 출력을 감소 만 상위 10 값을 작성해야 작은 KPI이있다. 이 요구 사항을 완료하려면, 나는 카운터를 사용 카운터가 11 같을 때 루프를 중단하지만, 여전히 감속기 HDFS에 모든 값을 기록했다.

이것은 아주 간단 자바 코드,하지만 난 붙어입니다 :(

테스트를 위해, 나는이 작업을 수행하는 하나 개의 독립 실행 형 클래스 (Java 응용 프로그램)을 만들었으며이가 노력하고 있습니다; 이 감속기 코드에서 작동하지 않는 이유를 궁금하네요.

어떤 사람은 나 좀 도와 내가 모르는 뭔가가있는 경우 제안하시기 바랍니다.

package comparableTest;
import java.io.IOException;
import java.nio.ByteBuffer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.IntWritable.Comparator;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class ValueSortExp2 {
    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration(true);

        String arguments[] = new GenericOptionsParser(conf, args).getRemainingArgs();

        Job job = new Job(conf, "Test commond");
        job.setJarByClass(ValueSortExp2.class);

        // Setup MapReduce
        job.setMapperClass(MapTask2.class);
        job.setReducerClass(ReduceTask2.class);
        job.setNumReduceTasks(1);

        // Specify key / value
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(Text.class);
        job.setSortComparatorClass(IntComparator2.class);
        // Input
        FileInputFormat.addInputPath(job, new Path(arguments[0]));
        job.setInputFormatClass(TextInputFormat.class);

        // Output
        FileOutputFormat.setOutputPath(job, new Path(arguments[1]));
        job.setOutputFormatClass(TextOutputFormat.class);


        int code = job.waitForCompletion(true) ? 0 : 1;
        System.exit(code);

    }

    public static class IntComparator2 extends WritableComparator {

        public IntComparator2() {
            super(IntWritable.class);
        }

        @Override
        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {

            Integer v1 = ByteBuffer.wrap(b1, s1, l1).getInt();
            Integer v2 = ByteBuffer.wrap(b2, s2, l2).getInt();

            return v1.compareTo(v2) * (-1);
        }
    }

    public static class MapTask2 extends Mapper<LongWritable, Text, IntWritable, Text> {

            public void  map(LongWritable key,Text value, Context context) throws IOException, InterruptedException {

                String tokens[]= value.toString().split("\\t");

            //    int empId = Integer.parseInt(tokens[0])    ;    
                int count = Integer.parseInt(tokens[2])    ;

                context.write(new IntWritable(count), new Text(value));

            }    

        }


    public static class ReduceTask2 extends Reducer<IntWritable, Text, IntWritable, Text> {
        int cnt=0;
        public void reduce(IntWritable key, Iterable<Text> list, Context context)
                throws java.io.IOException, InterruptedException {


            for (Text value : list ) {
                cnt ++;

                if (cnt==11)
                {
                    break;    
                }

                context.write(new IntWritable(cnt), value);




            }

        }
}
}  
package comparableTest;

import java.io.IOException;
import java.util.ArrayList;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer.Context;

public class TestData {

    //static int cnt=0;


    public static void main(String args[]) throws IOException, InterruptedException {

        ArrayList<String> list = new ArrayList<String>() {{
            add("A");
            add("B");
            add("C");
            add("D");
        }};


        reduce(list);


    }

    public static void reduce(Iterable<String> list)
            throws java.io.IOException, InterruptedException {


        int cnt=0;
        for (String value : list ) {
            cnt ++;

            if (cnt==3)
            {
                break;    
            }

            System.out.println(value);    


        }

    }
}

ID '의 NAME 카운트 (상위 10 소계 표시해야)

(1995) 2077 (1) 토이 스토리

흰뺨 10 (1995) 888

(100) 시청 (1996) (128)

응결 1,000 (1996) 20

1001 준의 (어소) (1982) 0

1002 에드의 다음 이동 (1996) 8

1,003 극단적 측정 값 (1996) 121

1,004 리머 사람은 (1996) 101

1005 D3 : 마이티 오리 (1996) 142

1,006 챔버는 (1996) 78

1007 애플 덤 플링 갱의 (1975) (232)

1008 데이비 크로켓, 야생 국경의 왕 (1955 년) 97

마녀 산에 1,009 탈출 (1975) 291

병 (101)의 로켓 (1996) 253

1,010 러브 버그의 (1969) 242

1011 허비 (1974) (135) 다시 놀이기구

1012 올드 Yeller (1957) (301)

1,013 상위 트랩 제 (1961) 258

낙천적 1,014 (1960) 136

바운드 1,015 귀로 : 인크 여행 (1993) 234

1,016 개 얽히고 상기 (1959) 156

1017 스위스 패밀리 로빈슨 (1960) (276)

1018 터무니 고양이 그! (1965) 123

바다 (1954) 575에서 1019 만 개 리그

102 씨 틀린 (1996) 60

1,020 쿨 러닝 (1993) 392

외야 (1994) 247에서 1,021 천사

1,022 신데렐라 (1950) 577

1023 푸우와 세차게 몰아 치는 날 (1968) (221)

1,024 세 Caballeros를 상기 (1945) 126

스톤에서 1025 검의 (1963) 293

내 마음에 1026 그래서 친애하는 (1949) 8

1027 로빈 후드 : 도둑의 왕자 (1991) (344)

(1964) 1011 (1028) 메리 포핀스

1029 덤보 (1941) 568

잊을 103 (1996) 33

1030 피트의 드래곤 (1977) (323)

1031 Bedknobs 및 빗자루 (1971) 319`

해결법

  1. ==============================

    1.당신은 INT의 CNT = 0을 이동하는 경우; 줄이거 방법 내부 (이 방법의 첫 번째 문으로), 각 키 (나는 이것이 당신이 원하는 것 같다)의 처음 10 개 값을 얻을 것이다.

    당신은 INT의 CNT = 0을 이동하는 경우; 줄이거 방법 내부 (이 방법의 첫 번째 문으로), 각 키 (나는 이것이 당신이 원하는 것 같다)의 처음 10 개 값을 얻을 것이다.

    그것은 지금처럼 그렇지 않으면, 당신의 카운터는 계속 증가하고 당신은 12을 계속 만 (에 관계없이 키) 11 값을 건너 뜁니다.

    당신이 (상관없이 키) 10 값을 인쇄 할 경우, 당신은이 탄소 나노 튜브 초기화를두고있는 경우 (CNT가> 10)에 경우 상태를 변경 ... 당신이 당신의 알고리즘을 재고해야 할 수 있도록 그러나 이것은 좋은 방법이 아닙니다. (당신이 1 개 이상 감속기 및 해시 파티션 프로그램이있을 때, 분산 환경에서 먼저 처리 될 키 아는 방법, 당신은 10 개 임의의 값을 원하지 않는 가정?)

  2. from https://stackoverflow.com/questions/46087100/counter-is-not-working-in-reducer-code by cc-by-sa and MIT license