복붙노트

[HADOOP] MapReduce를 사용하여 숫자의 평균 찾기

HADOOP

MapReduce를 사용하여 숫자의 평균 찾기

MapReduce를 사용하여 숫자의 평균을 찾기위한 코드를 작성하려고했습니다.

글로벌 카운터를 사용하여 목표에 도달하려고하지만 Mapper의 맵 메소드에서 카운터 값을 설정할 수 없으며 Reducer의 reduce 메소드에서 카운터 값을 검색 할 수 없습니다.

지도에서 어쨌든 글로벌 카운터를 사용해야합니까 (예 : 제공된 Reporter의 incrCounter (키, 금액) 사용)? 아니면 다른 논리를 제안하여 몇 가지 숫자의 평균을 구 하시겠습니까?

해결법

  1. ==============================

    1.논리는 매우 간단합니다. 모든 숫자가 같은 키를 가지고 있다면, 매퍼는 당신이 그 같은 키로 평균을 찾고자하는 모든 값을 보냈습니다. 이 때문에 감속기에서 반복기의 값을 합계 할 수 있습니다. 그런 다음 반복기가 작동하는 횟수에 따라 카운터를 유지할 수 있으므로 평균화 할 항목 수의 문제를 해결할 수 있습니다. 마지막으로 이터레이터 이후에 합계를 항목 수로 나눔으로써 평균을 찾을 수 있습니다.

    논리는 매우 간단합니다. 모든 숫자가 같은 키를 가지고 있다면, 매퍼는 당신이 그 같은 키로 평균을 찾고자하는 모든 값을 보냈습니다. 이 때문에 감속기에서 반복기의 값을 합계 할 수 있습니다. 그런 다음 반복기가 작동하는 횟수에 따라 카운터를 유지할 수 있으므로 평균화 할 항목 수의 문제를 해결할 수 있습니다. 마지막으로 이터레이터 이후에 합계를 항목 수로 나눔으로써 평균을 찾을 수 있습니다.

    이 논리는 결합 자 클래스가 감속기와 같은 클래스로 설정된 경우에는 작동하지 않습니다.

  2. ==============================

    2.3 Mapper / Combiner / Reducer를 모두 사용하여 문제를 해결하십시오. 전체 코드 및 설명은 아래 링크를 참조하십시오.

    3 Mapper / Combiner / Reducer를 모두 사용하여 문제를 해결하십시오. 전체 코드 및 설명은 아래 링크를 참조하십시오.

    http://alchemistviews.blogspot.com/2013/08/calculate-average-in-map-reduce-using.html

  3. ==============================

    3.평균은 합계 / 크기입니다. sum이 sum = k1 + k2 + k3 + ...와 같은 경우, 합산 후 또는 합산 중에 크기로 나눌 수 있습니다. 따라서 평균은 또한 k1 / size + k2 / size + k3 / size + ...입니다.

    평균은 합계 / 크기입니다. sum이 sum = k1 + k2 + k3 + ...와 같은 경우, 합산 후 또는 합산 중에 크기로 나눌 수 있습니다. 따라서 평균은 또한 k1 / size + k2 / size + k3 / size + ...입니다.

    Java 8 코드는 간단합니다.

        public double average(List<Valuable> list) {
          final int size = list.size();
          return list
                .stream()
                .mapToDouble(element->element.someValue())
                .reduce(0,(sum,x)->sum+x/size);
        }
    

    따라서 먼저 목록의 각 요소 값을 두 배로 매핑 한 다음 reduce 함수를 통해 합계합니다.

  4. ==============================

    4.산술 평균은 분포가 아니고 대수적 인 집합 함수입니다. 한 외 (Han et al. 집계 함수는 다음 경우에 분포합니다.

    산술 평균은 분포가 아니고 대수적 인 집합 함수입니다. 한 외 (Han et al. 집계 함수는 다음 경우에 분포합니다.

    또는 다른 말로 표현하자면, 연관성 있고 통용 적이어야합니다. 그러나 집계 함수는 Han et al에 따르면 대수적이다. 만약:

    산술 평균의 경우 이것은 avg = sum / count입니다. 분명히 추가 계산을 수행해야합니다. 그러나 글로벌 카운터를 사용하는 것은 오용으로 보입니다. API는 다음과 같이 org.apache.hadoop.mapreduce.Counter를 설명합니다.

    카운터는 일반적으로 데이터 처리 중 계산의 일부가 아닌 작업에 대한 통계에 일반적으로 사용되어야합니다.

    따라서 파티션 내에서해야하는 모든 작업은 번호를 추가하고 합계 (합계, 개수)와 함께 개수를 추적하는 것입니다. 간단한 접근법은 와 같은 문자열 일 수 있습니다.

    매퍼에서는 카운트가 항상 1이되고 합계는 원시 값 자체입니다. 맵 파일을 줄이려면 결합자를 사용하고 (sum_1 + ... + sum_n, count_1 + ... + count_n)와 같은 집계를 처리 할 수 ​​있습니다. 이것은 감속기에서 반복되어야하고 최종 계산 합계 / 계산에 의해 완료되어야합니다. 이 방법은 사용 된 키와는 독립적이라는 점을 명심하십시오!

    마지막으로 로스 앤젤레스에서 "평균 범죄 시간"을 계산해야하는 LAPD의 원시 범죄 통계를 사용하는 간단한 예가 있습니다.

    public class Driver extends Configured implements Tool {
        enum Counters {
            DISCARDED_ENTRY
        }
    
        public static void main(String[] args) throws Exception {
            ToolRunner.run(new Driver(), args);
        }
    
        public int run(String[] args) throws Exception {
            Configuration configuration = getConf();
    
            Job job = Job.getInstance(configuration);
            job.setJarByClass(Driver.class);
    
            job.setMapperClass(Mapper.class);
            job.setMapOutputKeyClass(LongWritable.class);
            job.setMapOutputValueClass(Text.class);
    
            job.setCombinerClass(Combiner.class);
            job.setReducerClass(Reducer.class);
            job.setOutputKeyClass(LongWritable.class);
            job.setOutputValueClass(Text.class);
    
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
            return job.waitForCompletion(true) ? 0 : -1;
        }
    }
    
    public class Mapper extends org.apache.hadoop.mapreduce.Mapper<
        LongWritable,
        Text,
        LongWritable,
        Text
    > {
    
        @Override
        protected void map(
            LongWritable key,
            Text value,
            org.apache.hadoop.mapreduce.Mapper<
                LongWritable,
                Text,
                LongWritable,
                Text
            >.Context context
        ) throws IOException, InterruptedException {
                // parse the CSV line
                ArrayList<String> values = this.parse(value.toString());
    
                // validate the parsed values
                if (this.isValid(values)) {
    
                    // fetch the third and the fourth column
                    String time = values.get(3);
                    String year = values.get(2)
                        .substring(values.get(2).length() - 4);
    
                    // convert time to minutes (e.g. 1542 -> 942)
                    int minutes = Integer.parseInt(time.substring(0, 2))
                        * 60 + Integer.parseInt(time.substring(2,4));
    
                    // create the aggregate atom (a/n)
                    // with a = time in minutes and n = 1
                    context.write(
                        new LongWritable(Integer.parseInt(year)),
                        new Text(Integer.toString(minutes) + ":1")
                    );
                } else {
                    // invalid line format, so we increment a counter
                    context.getCounter(Driver.Counters.DISCARDED_ENTRY)
                        .increment(1);
                }
        }
    
        protected boolean isValid(ArrayList<String> values) {
            return values.size() > 3 
                && values.get(2).length() == 10 
                && values.get(3).length() == 4;
        }
    
        protected ArrayList<String> parse(String line) {
            ArrayList<String> values = new ArrayList<>();
            String current = "";
            boolean escaping = false;
    
            for (int i = 0; i < line.length(); i++){
                char c = line.charAt(i);
    
                if (c == '"') {
                    escaping = !escaping;
                } else if (c == ',' && !escaping) {
                    values.add(current);
                    current = "";
                } else {
                    current += c;
                }
            }
    
            values.add(current);
    
            return values;
        }
    }
    
    public class Combiner extends org.apache.hadoop.mapreduce.Reducer<
        LongWritable,
        Text,
        LongWritable,
        Text
    > {
    
        @Override
        protected void reduce(
            LongWritable key,
            Iterable<Text> values,
            Context context
        ) throws IOException, InterruptedException {
            Long n = 0l;
            Long a = 0l;
            Iterator<Text> iterator = values.iterator();
    
            // calculate intermediate aggregates
            while (iterator.hasNext()) {
                String[] atom = iterator.next().toString().split(":");
                a += Long.parseLong(atom[0]);
                n += Long.parseLong(atom[1]);
            }
    
            context.write(key, new Text(Long.toString(a) + ":" + Long.toString(n)));
        }
    }
    
    public class Reducer extends org.apache.hadoop.mapreduce.Reducer<
        LongWritable,
        Text,
        LongWritable,
        Text
    > {
    
        @Override
        protected void reduce(
            LongWritable key, 
            Iterable<Text> values, 
            Context context
        ) throws IOException, InterruptedException {
            Long n = 0l;
            Long a = 0l;
            Iterator<Text> iterator = values.iterator();
    
            // calculate the finale aggregate
            while (iterator.hasNext()) {
                String[] atom = iterator.next().toString().split(":");
                a += Long.parseLong(atom[0]);
                n += Long.parseLong(atom[1]);
            }
    
            // cut of seconds
            int average = Math.round(a / n);
    
            // convert the average minutes back to time
            context.write(
                key,
                new Text(
                    Integer.toString(average / 60) 
                        + ":" + Integer.toString(average % 60)
                )
            );
        }
    }
    
  5. from https://stackoverflow.com/questions/10667575/find-the-average-of-numbers-using-mapreduce by cc-by-sa and MIT license