복붙노트

[HADOOP] 하둡 - 감속기가 시작되지 않는

HADOOP

하둡 - 감속기가 시작되지 않는

내 노트북 ​​(OSX)에 설치 의사 분산 운영 - 나는 KNN은 단일 노드 클러스터에 대한 하둡 2.6.0에 맵리 듀스 hbrj 알고리즘을 결합 오픈 소스를 실행하려합니다. 이 코드입니다.

매퍼, 감속기 메인 드라이버 :

public class RPhase2 extends Configured implements Tool 
{
    public static class MapClass extends MapReduceBase 
    implements Mapper<LongWritable, Text, IntWritable, RPhase2Value> 
    {
        public void map(LongWritable key, Text value, 
        OutputCollector<IntWritable, RPhase2Value> output, 
        Reporter reporter)  throws IOException 
        {
            String line = value.toString();
            String[] parts = line.split(" +");
            // key format <rid1>
            IntWritable mapKey = new IntWritable(Integer.valueOf(parts[0]));
            // value format <rid2, dist>
            RPhase2Value np2v = new RPhase2Value(Integer.valueOf(parts[1]), Float.valueOf(parts[2]));
            System.out.println("############### key:  " + mapKey.toString() + "   np2v:  " + np2v.toString());
            output.collect(mapKey, np2v);
        }
    }

    public static class Reduce extends MapReduceBase
    implements Reducer<IntWritable, RPhase2Value, NullWritable, Text> 
    {
        int numberOfPartition;  
        int knn;

        class Record {...}

        class RecordComparator implements Comparator<Record> {...}

        public void configure(JobConf job) 
        {
            numberOfPartition = job.getInt("numberOfPartition", 2); 
            knn = job.getInt("knn", 3);
            System.out.println("########## configuring!");
        }   

        public void reduce(IntWritable key, Iterator<RPhase2Value> values, 
        OutputCollector<NullWritable, Text> output, 
        Reporter reporter) throws IOException 
        {
            //initialize the pq
            RecordComparator rc = new RecordComparator();
            PriorityQueue<Record> pq = new PriorityQueue<Record>(knn + 1, rc);

            System.out.println("Phase 2 is at reduce");
            System.out.println("########## key: " + key.toString());

            // For each record we have a reduce task
            // value format <rid1, rid2, dist>
            while (values.hasNext()) 
            {
                RPhase2Value np2v = values.next();

                int id2 = np2v.getFirst().get();
                float dist = np2v.getSecond().get();
                Record record = new Record(id2, dist);
                pq.add(record);
                if (pq.size() > knn)
                    pq.poll();
            }

            while(pq.size() > 0) 
            {
                output.collect(NullWritable.get(), new Text(key.toString() + " " + pq.poll().toString()));
                //break; // only ouput the first record
            }

        } // reduce
    } // Reducer

    public int run(String[] args) throws Exception {
        JobConf conf = new JobConf(getConf(), RPhase2.class);
        conf.setJobName("RPhase2");

        conf.setMapOutputKeyClass(IntWritable.class);
        conf.setMapOutputValueClass(RPhase2Value.class);

        conf.setOutputKeyClass(NullWritable.class);
        conf.setOutputValueClass(Text.class);   

        conf.setMapperClass(MapClass.class);        
        conf.setReducerClass(Reduce.class);

        int numberOfPartition = 0;  
        List<String> other_args = new ArrayList<String>();

        for(int i = 0; i < args.length; ++i) 
        {
            try {
                if ("-m".equals(args[i])) {
                    //conf.setNumMapTasks(Integer.parseInt(args[++i]));
                    ++i;
                } else if ("-r".equals(args[i])) {
                    conf.setNumReduceTasks(Integer.parseInt(args[++i]));
                } else if ("-p".equals(args[i])) {
                    numberOfPartition = Integer.parseInt(args[++i]);
                    conf.setInt("numberOfPartition", numberOfPartition);
                } else if ("-k".equals(args[i])) {
                    int knn = Integer.parseInt(args[++i]);
                    conf.setInt("knn", knn);
                    System.out.println(knn + "~ hi");
                } else {
                    other_args.add(args[i]);
                }
                conf.setNumReduceTasks(numberOfPartition * numberOfPartition);
                //conf.setNumReduceTasks(1);
            } catch (NumberFormatException except) {
                System.out.println("ERROR: Integer expected instead of " + args[i]);
                return printUsage();
            } catch (ArrayIndexOutOfBoundsException except) {
                System.out.println("ERROR: Required parameter missing from " + args[i-1]);
                return printUsage();
            }
        } 


        FileInputFormat.setInputPaths(conf, other_args.get(0));
        FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));

        JobClient.runJob(conf);
        return 0;
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new RPhase2(), args);
    }
} // RPhase2

내가 이것을 실행하면 매퍼는 성공하지만 작업이 갑자기 종료하고, 감속기가 인스턴스화하지 않습니다. 또한, 오류는 지금 (심지어 로그 파일에) 인쇄됩니다. 또한 있기 때문에 감속기 결코를의 구성에서 인쇄 문이 인쇄되는 것을 알고있다. 산출:

15/06/15 14:00:37 INFO mapred.LocalJobRunner: map task executor complete.
15/06/15 14:00:38 INFO mapreduce.Job:  map 100% reduce 0%
15/06/15 14:00:38 INFO mapreduce.Job: Job job_local833125918_0001 completed successfully
15/06/15 14:00:38 INFO mapreduce.Job: Counters: 20
    File System Counters
        FILE: Number of bytes read=12505456
        FILE: Number of bytes written=14977422
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=11408
        HDFS: Number of bytes written=8724
        HDFS: Number of read operations=216
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=99
    Map-Reduce Framework
        Map input records=60
        Map output records=60
        Input split bytes=963
        Spilled Records=0
        Failed Shuffles=0
        Merged Map outputs=0
        GC time elapsed (ms)=14
        Total committed heap usage (bytes)=1717567488
    File Input Format Counters 
        Bytes Read=2153
    File Output Format Counters 
        Bytes Written=1645

내가 지금까지했던 :

아무도 내가 문제가 내가 아주 감사 것이 무엇인지 찾기 위해 시도 할 수 있습니다 무엇에 어떤 힌트이나 제안이있는 경우!

여기에 이전에 내 문제에 대한 질문을 게시 그냥 메모 (하둡 KNN은 100 % 0 %를 줄일지도에 붙어 알고리즘을 결합)하지만 내가 원한 때문에 그것은 충분한주의를하지 않았다 다른 관점에서이 문제를 다시 부탁드립니다. 당신은 내 로그 파일에 대한 자세한 내용은이 링크를 사용할 수 있습니다.

해결법

  1. ==============================

    1.나는이 문제를 알아 낸 그것은 어리석은 일이었다. 위의 코드에서 통지하는 경우, 인수를 읽기 전에 numberOfPartition은 0으로 설정, 및 감속기의 수는 numberOfPartition * numberOfPartition으로 설정됩니다. 나는 사용자로 파티션 매개 변수의 수 (나는 단순히 제공되는 README에서 인수 라인을 붙여 복사 대부분 때문에) 변경하지 않은 때문에 감속기도 시작되지 않을 이유입니다.

    나는이 문제를 알아 낸 그것은 어리석은 일이었다. 위의 코드에서 통지하는 경우, 인수를 읽기 전에 numberOfPartition은 0으로 설정, 및 감속기의 수는 numberOfPartition * numberOfPartition으로 설정됩니다. 나는 사용자로 파티션 매개 변수의 수 (나는 단순히 제공되는 README에서 인수 라인을 붙여 복사 대부분 때문에) 변경하지 않은 때문에 감속기도 시작되지 않을 이유입니다.

  2. from https://stackoverflow.com/questions/30853748/hadoop-a-reducer-is-not-being-initiated by cc-by-sa and MIT license