복붙노트

[HADOOP] 전역 변수의 값은 루프에 대해 변경되지 않습니다.

HADOOP

전역 변수의 값은 루프에 대해 변경되지 않습니다.

나는 hadoop 프로젝트를 개발 중이다. 나는 특정 일에 고객을 찾고 그 날에 최대 소비량을 가진 사람들을 쓰고 싶다. 내 감속기 클래스에서, 어떤 이유로, 전역 변수 max는 for 루프 후에 값을 변경하지 않습니다.

편집 특정 날짜에 최대 소비량을 가진 고객을 찾고 싶습니다. 나는 원하는 날짜에 고객을 찾을 수 있었지만 내 Reducer 클래스의 문제에 직면하고 있습니다. 다음은 코드입니다.

EDIT # 2 나는 값 (소비)이 자연수라는 것을 이미 알고있다. 그래서 출력 파일에서 나는 최대 소비량을 가진 특정 날의 고객 만이되고 싶다.

편집 # 3 내 입력 파일은 많은 데이터로 구성되어 있습니다. 세 개의 컬럼이 있습니다. 고객 ID, 타임 스탬프 (yyyy-mm-DD HH : mm : ss) 및 소비량

드라이버 클래스

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class alicanteDriver {

    public static void main(String[] args) throws Exception {
        long t_start = System.currentTimeMillis();
        long t_end;

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "Alicante");
        job.setJarByClass(alicanteDriver.class);

        job.setMapperClass(alicanteMapperC.class);        

        //job.setCombinerClass(alicanteCombiner.class);

        job.setPartitionerClass(alicantePartitioner.class);

        job.setNumReduceTasks(2);

        job.setReducerClass(alicanteReducerC.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path("/alicante_1y.txt"));
        FileOutputFormat.setOutputPath(job, new Path("/alicante_output"));
        job.waitForCompletion(true);
        t_end = System.currentTimeMillis();

        System.out.println((t_end-t_start)/1000);
    }
 }

매퍼 클래스

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class alicanteMapperC extends
        Mapper<LongWritable, Text, Text, IntWritable> {

    String Customer = new String();
    SimpleDateFormat ft = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    Date t = new Date();
    IntWritable Consumption = new IntWritable();
    int counter = 0;

    // new vars
    int max = 0;

    @Override
    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {

        Date d2 = null;
        try {
            d2 = ft.parse("2013-07-01 01:00:00");
        } catch (ParseException e1) {
            // TODO Auto-generated catch block
            e1.printStackTrace();
        }

        if (counter > 0) {

            String line = value.toString();
            StringTokenizer itr = new StringTokenizer(line, ",");

            while (itr.hasMoreTokens()) {
                Customer = itr.nextToken();
                try {
                    t = ft.parse(itr.nextToken());
                } catch (ParseException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                Consumption.set(Integer.parseInt(itr.nextToken()));

                //sort out as many values as possible
                if(Consumption.get() > max) {
                    max = Consumption.get();
                }

                //find customers in a certain date
                if (t.compareTo(d2) == 0 && Consumption.get() == max) {
                    context.write(new Text(Customer), Consumption);
                }
            }
        }
        counter++;
    }
}

감속기 클래스

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import com.google.common.collect.Iterables;

public class alicanteReducerC extends
        Reducer<Text, IntWritable, Text, IntWritable> {

    public void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {

        int max = 0; //this var

        // declaration of Lists
        List<Text> l1 = new ArrayList<Text>();
        List<IntWritable> l2 = new ArrayList<IntWritable>();

        for (IntWritable val : values) {
            if (val.get() > max) {
                max = val.get();
            }
            l1.add(key);
            l2.add(val);
        }

        for (int i = 0; i < l1.size(); i++) {
            if (l2.get(i).get() == max) {
                context.write(key, new IntWritable(max));
            }
        }
    }
}

입력 파일의 일부 값

C11FA586148,2013-07-01 01:00:00,3
C11FA586152,2015-09-01 15:22:22,3
C11FA586168,2015-02-01 15:22:22,1
C11FA586258,2013-07-01 01:00:00,5
C11FA586413,2013-07-01 01:00:00,5
C11UA487446,2013-09-01 15:22:22,3
C11UA487446,2013-07-01 01:00:00,3
C11FA586148,2013-07-01 01:00:00,4

출력이 있어야합니다.

C11FA586258 5
C11FA586413 5

몇 시간 동안 포럼을 검색했지만 여전히 문제를 찾을 수 없습니다. 어떤 아이디어?

해결법

  1. ==============================

    1.리팩토링 된 코드는 다음과 같습니다. 소비 날짜에 대한 특정 값을 전달하거나 변경할 수 있습니다. 이 경우 감속기가 필요하지 않습니다. 내 첫 번째 대답은 입력에서 최대의 comsumption을 쿼리하는 것이 었습니다.이 대답은 사용자가 입력 한 소비량을 쿼리하는 것입니다. setup 메소드는 mapper.maxConsumption.date에 대한 사용자 제공 값을 가져 와서 map 메소드에 전달합니다. 감속기의 클리어 업 방법은 모든 최대 소비 고객을 검사하고 최종 최대 입력 값 (이 경우 5)을 씁니다. 자세한 내용은 스크린 샷을 참조하십시오.

    리팩토링 된 코드는 다음과 같습니다. 소비 날짜에 대한 특정 값을 전달하거나 변경할 수 있습니다. 이 경우 감속기가 필요하지 않습니다. 내 첫 번째 대답은 입력에서 최대의 comsumption을 쿼리하는 것이 었습니다.이 대답은 사용자가 입력 한 소비량을 쿼리하는 것입니다. setup 메소드는 mapper.maxConsumption.date에 대한 사용자 제공 값을 가져 와서 map 메소드에 전달합니다. 감속기의 클리어 업 방법은 모든 최대 소비 고객을 검사하고 최종 최대 입력 값 (이 경우 5)을 씁니다. 자세한 내용은 스크린 샷을 참조하십시오.

    다음과 같이 실행하십시오 :

    hadoop jar maxConsumption.jar -Dmapper.maxConsumption.date="2013-07-01 01:00:00" Data/input.txt output/maxConsupmtion5
    
    #input:
    C11FA586148,2013-07-01 01:00:00,3
    C11FA586152,2015-09-01 15:22:22,3
    C11FA586168,2015-02-01 15:22:22,1
    C11FA586258,2013-07-01 01:00:00,5
    C11FA586413,2013-07-01 01:00:00,5
    C11UA487446,2013-09-01 15:22:22,3
    C11UA487446,2013-07-01 01:00:00,3
    C11FA586148,2013-07-01 01:00:00,4
    
    #output:
    C11FA586258 5
    C11FA586413 5
    
    public class maxConsumption  extends Configured implements Tool{
    
        public static class DataMapper extends Mapper<Object, Text, Text, IntWritable> {
            SimpleDateFormat ft = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            Date dateInFile, filterDate;
            int lineno=0;
            private final static Text customer = new Text();
            private final static IntWritable consumption = new IntWritable();
            private final static Text maxConsumptionDate = new Text();
    
            public void setup(Context context) {
                Configuration config = context.getConfiguration();
                maxConsumptionDate.set(config.get("mapper.maxConsumption.date"));
            }
    
            public void map(Object key, Text value, Context context) throws IOException, InterruptedException{
                try{
                    lineno++;
                    filterDate = ft.parse(maxConsumptionDate.toString());
                    //map data from line/file
                    String[] fields = value.toString().split(",");
                    customer.set(fields[0].trim());
                    dateInFile = ft.parse(fields[1].trim());
                    consumption.set(Integer.parseInt(fields[2].trim()));
    
                    if(dateInFile.equals(filterDate)) //only send to reducer if date filter matches....
                        context.write(new Text(customer), consumption);
                }catch(Exception e){
                    System.err.println("Invaid Data at line: " + lineno + " Error: " + e.getMessage());
                }
            }   
        }
    
        public  static class DataReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
            LinkedHashMap<String, Integer> maxConsumption = new LinkedHashMap<String,Integer>();
            @Override
            public void reduce(Text key, Iterable<IntWritable> values, Context context) 
                    throws IOException, InterruptedException {
                int max=0;
                System.out.print("reducer received: " + key + " [ ");
                for(IntWritable value: values){
                    System.out.print( value.get() + " ");
                    if(value.get() > max)
                        max=value.get();
                }
                System.out.println( " ]");
                System.out.println(key.toString() + "    max is   " + max);
                maxConsumption.put(key.toString(), max);
            }
    
            @Override
            protected void cleanup(Context context)
                    throws IOException, InterruptedException {
                int max=0;
                //first find the max from reducer
                for (String key : maxConsumption.keySet()){
                     System.out.println("cleaup customer : " + key.toString() + " consumption : " + maxConsumption.get(key) 
                             + " max: " + max); 
                    if(maxConsumption.get(key) > max)
                        max=maxConsumption.get(key);
                }
    
                System.out.println("final max is: " + max);
                //write only the max value from map
                for (String key : maxConsumption.keySet()){
                    if(maxConsumption.get(key) == max)
                        context.write(new Text(key), new IntWritable(maxConsumption.get(key)));
                }
            }
        }
    
        public static void main(String[] args) throws Exception {
            int res = ToolRunner.run(new Configuration(), new maxConsumption(), args);
            System.exit(res);
        }
    
        public int run(String[] args) throws Exception {
            if (args.length != 2) {
                System.err.println("Usage: -Dmapper.maxConsumption.date=\"2013-07-01 01:00:00\" <in> <out>");
                System.exit(2);
            }
            Configuration conf = this.getConf();
            Job job = Job.getInstance(conf, "get-max-consumption");
            job.setJarByClass(maxConsumption.class);
            job.setMapperClass(DataMapper.class);
            job.setReducerClass(DataReducer.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
    
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            FileSystem fs = null;
            Path dstFilePath = new Path(args[1]);
            try {
                fs = dstFilePath.getFileSystem(conf);
                if (fs.exists(dstFilePath))
                    fs.delete(dstFilePath, true);
            } catch (IOException e1) {
                e1.printStackTrace();
            }
            return job.waitForCompletion(true) ? 0 : 1;
        } 
    }
    

  2. ==============================

    2.아마도 귀하의 감속기로가는 모든 값은 0 미만입니다. 변수가 변하는 지 확인하려면 최소값을 사용해보십시오.

    아마도 귀하의 감속기로가는 모든 값은 0 미만입니다. 변수가 변하는 지 확인하려면 최소값을 사용해보십시오.

    max = MIN_VALUE;
    

    당신이 말한 것을 기반으로 출력은 오직 0이어야합니다 (이 경우 감속기의 최대 값은 0입니다) 또는 출력이 없습니다 (모든 값은 0보다 작음). 또한 이것을보십시오

    context.write(key, new IntWritable());
    

    그것은해야한다

    context.write(key, new IntWritable(max));
    

    편집 : 방금 당신의 매퍼 클래스를 보았다, 그것은 많은 문제가있다. 다음 코드는 모든 매퍼에서 첫 번째 요소를 건너 뜁니다. 왜?

        if (counter > 0) {
    

    나는 당신이 이처럼 뭔가를 얻고 있다고 생각하니? "customer 2013-07-01 01:00:00, 2, ..."이 경우 값을 이미 필터링하고 있다면 매퍼 범위가 아닌 로컬 변수로 max 변수를 선언해야합니다. 여러 고객.

    이 문제에 관해 많은 질문이 있습니다. 각 매퍼 (mapper)에 대한 입력 내용과 원하는 것을 설명 할 수 있습니다.

    EDIT2 : 귀하의 대답을 바탕으로 이것을 시도 할 것

    import java.io.IOException;
    import java.text.ParseException;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.StringTokenizer;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    public class AlicanteMapperC extends Mapper<LongWritable, Text, Text, IntWritable> {
        private final int max = 5;
        private SimpleDateFormat ft = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    
        @Override
        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            Date t = null;
            String[] line = value.toString().split(",");
            String customer = line[0];
            try {
                t = ft.parse(line[1]);
            } catch (ParseException e) {
                // TODO Auto-generated catch block
                throw new RuntimeException("something wrong with the date!" + line[1]);
            }
            Integer consumption = Integer.parseInt(line[2]);
    
            //find customers in a certain date
            if (t.compareTo(ft.parse("2013-07-01 01:00:00")) == 0 && consumption == max) {
                context.write(new Text(customer), new IntWritable(consumption));
            }
            counter++;
        }
    }
    

    그리고 감속기는 고객 당 1 레코드를 방출하기 매우 간단합니다.

    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    import com.google.common.collect.Iterables;
    
    public class AlicanteReducerC extends
            Reducer<Text, IntWritable, Text, IntWritable> {
    
        public void reduce(Text key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {
    
            //We already now that it is 5
             context.write(key, new IntWritable(5));
             //If you want something different, for example report customer with different values, you could iterate over the iterator like this 
             //for (IntWritable val : values) {
               // context.write(key, new IntWritable(val));
            //}      
        }
    }
    
  3. from https://stackoverflow.com/questions/41635425/global-variables-value-doesnt-change-after-for-loop by cc-by-sa and MIT license