복붙노트

[HADOOP] mapreduce composite 키 샘플 - 원하는 출력을 표시하지 않습니다.

HADOOP

mapreduce composite 키 샘플 - 원하는 출력을 표시하지 않습니다.

mapreduce & hadoop 세계에 처음 접했을 때 기본 mapreduce 프로그램을 시험해 본 후 compositekey 샘플 코드를 사용하려고했습니다.

입력 데이터 세트는 다음과 같습니다.

국가, 주, 군, 수백만 인구

미국, CA, 알라 메다, (100)

미국, 캘리포니아, 200 losangels

미국, CA, 새크라멘토, (100)

미국, 플로리다, xxx, 10

미국, 플로리다, yyy, 12

원하는 출력 데이터는 다음과 같아야합니다.

미국, CA (500)

미국, FL, (22)

대신 Country + State 필드가 복합 키를 구성합니다. 나는 다음 출력을 얻고있다. 인구는 어떤 이유로 추가되지 않습니다. 누군가 내가 저의 실수를 지적 할 수 있습니까? 또한 WriteableComparable 인터페이스를 구현하는 Country.java 클래스를 살펴보십시오. 해당 구현에 문제가있을 수 있습니다.

미국, CA (100)

미국, CA (200)

미국, CA (100)

미국, FL, (10)

미국, FL, (12)

국가 + 주마다 인구가 추가되지 않습니다.

이것은 WritableComparable 인터페이스를 구현하는 Country 클래스입니다.

import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;  
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  

 * The Country class implements WritabelComparator to implements custom    sorting to perform group by operation. It
 * sorts country and then state.
 * 
 */
public class Country implements WritableComparable<Country> {

    Text country;
    Text state;

    public Country(Text country, Text state) {
        this.country = country;
        this.state = state;
    }
    public Country() {
        this.country = new Text();
        this.state = new Text();

    }

    /*
     * (non-Javadoc)
     * 
     * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)
     */
    public void write(DataOutput out) throws IOException {
        this.country.write(out);
        this.state.write(out);

    }

    /*
     * (non-Javadoc)
     * 
     * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)
     */
    public void readFields(DataInput in) throws IOException {

        this.country.readFields(in);
        this.state.readFields(in);
        ;

    }

    /*
     * (non-Javadoc)
     * 
     * @see java.lang.Comparable#compareTo(java.lang.Object)
     */
    public int compareTo(Country pop) {
        if (pop == null)
            return 0;
        int intcnt = country.compareTo(pop.country);
        if (intcnt != 0) {
            return intcnt;
        } else {
            return state.compareTo(pop.state);

        }
    }

    /*
     * (non-Javadoc)
     * 
     * @see java.lang.Object#toString()
     */
    @Override
    public String toString() {

        return country.toString() + ":" + state.toString();
    }

}

드라이버 프로그램 :

import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  


public class CompositeKeyDriver {

 public static void main(String[] args) throws IOException,    ClassNotFoundException, InterruptedException {


    Configuration conf = new Configuration();

    Job job = Job.getInstance(conf, "CompositeKeyDriver");

    //first argument is job itself
    //second argument is location of the input dataset
    FileInputFormat.addInputPath(job, new Path(args[0]));

    //first argument is the job itself
    //second argument is the location of the output path        
    FileOutputFormat.setOutputPath(job, new Path(args[1]));        


    job.setJarByClass(CompositeKeyDriver.class);

    job.setMapperClass(CompositeKeyMapper.class);

    job.setReducerClass(CompositeKeyReducer.class);

    job.setOutputKeyClass(Country.class);

    job.setOutputValueClass(IntWritable.class);


    //setting the second argument as a path in a path variable           
    Path outputPath = new Path(args[1]);

    //deleting the output path automatically from hdfs so that we don't have delete it explicitly            
    outputPath.getFileSystem(conf).delete(outputPath);


    System.exit(job.waitForCompletion(true) ? 0 : 1);
}

}

폴더 프로그램 :

import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  


  //  First two parameters are Input Key and Input Value. Input Key =   offset of each line (remember each line is a record). Input value = Line itself
  //  Second two parameters are Output Key and Output value of the Mapper. BTW, the outputs of the mapper are stored in the local file system and not on HDFS. 
  //  Output Key = Country object is sent. Output Value = population in millions in that country + state combination


    public class CompositeKeyMapper extends Mapper<LongWritable, Text, Country, IntWritable> {

    /** The cntry. */
    Country cntry = new Country();

    /** The cnt text. */
    Text cntText = new Text();

    /** The state text. */
    Text stateText = new Text();

    //population in a Country + State
    IntWritable populat = new IntWritable();

    /**
     * 
     * Reducer are optional in Map-Reduce if there is no Reducer defined in program then the output of the Mapper
     * directly write to disk without sorting.
     * 
     */

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        //Reader will give each record in a line to the Mapper.
        //That line is split with the de-limiter ","
        String line = value.toString();

        String[] keyvalue = line.split(",");


        //Country is the first item in the line in each record
        cntText.set(new Text(keyvalue[0]));

        //State is the second item in the line in each record
        stateText.set(keyvalue[1]);

        //This is the population. BTW, we can't send Java primitive datatypes into Context object. Java primitive data types are not effective in Serialization and De-serialization.
        //So we have to use the equivalent Writable datatypes provided by mapreduce framework

        populat.set(Integer.parseInt(keyvalue[3]));

        //Here you are creating an object of Country class and in the constructor assigning the country name and state
        Country cntry = new Country(cntText, stateText);

        //Here you are passing the country object and their population to the context object.
        //Remember that country object already implements "WritableComparable" interface which is equivalient to "Comparable" interface in Java. That implementation is in Country.java class
        //Because it implements the WritableComparable interface, the Country objects can be sorted in the shuffle phase. If WritableComparable interface is not implemented, we 
        //can't sort the objects.

        context.write(cntry, populat);

    }
}

감속기 프로그램 :

import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  


 //Remember the two output parameters of the Mapper class will become  the first two input parameters to the reducer class.

 public  class CompositeKeyReducer extends Reducer<Country, IntWritable, Country, IntWritable> {

 // The first parameter to reduce method is "Country". The country object has country name and state name (look at the Country.java class for more details.
 // The second parameter "values"   is the collection of population for Country+State (this is a composite Key)

    public void reduce(Country key, Iterator<IntWritable> values, Context context) throws IOException, InterruptedException {

        int numberofelements = 0;

       int cnt = 0;

       while (values.hasNext()) {

            cnt = cnt + values.next().get();

       }

    context.write(key, new IntWritable(cnt));

    }

}

해결법

  1. ==============================

    1.Country 클래스가 hashCode () 메서드를 구현해야하므로 HashPartitioner를 사용하고 있습니다.

    Country 클래스가 hashCode () 메서드를 구현해야하므로 HashPartitioner를 사용하고 있습니다.

    현재는 Object에 대한 기본 hashCode () 구현을 사용하므로 키가 올바르게 그룹화되지 않습니다.

    다음은 hashCode () 메서드의 예입니다.

    @Override
    public int hashCode() {
        final int prime = 31;
        int result = 1;
        result = prime * result + ((country == null) ? 0 : country.hashCode());
        result = prime * result + ((state == null) ? 0 : state.hashCode());
        return result;
    }
    

    추가 정보:

    안전한면에 있으려면 Text 객체를 설정해야합니다. 현재 국가 생성자에서이 작업을 수행합니다.

    public Country(Text country, Text state) {
        this.country = country;
        this.state = state;
    }
    

    다음과 같이 변경해야합니다.

    public Country(Text country, Text state) {
        this.country.set(country);
        this.state.set(state);
    }
    
  2. ==============================

    2.감속기 문제가 해결되었습니다. 코드를 변경하지 않았습니다. 내가 한 모든 일은 내 Cloudera Hadoop 이미지를 다시 시작하는 것이 었습니다.

    감속기 문제가 해결되었습니다. 코드를 변경하지 않았습니다. 내가 한 모든 일은 내 Cloudera Hadoop 이미지를 다시 시작하는 것이 었습니다.

    디버깅하는 과정에서 다음과 같은 사실을 발견했습니다. 누군가가 이러한 관찰에 대해 언급 할 수 있습니까?

  3. ==============================

    3.나는 Cloudera를 재시작해도 문제가 해결되지 않았지만 Basam과 같은 문제가있었습니다.

    나는 Cloudera를 재시작해도 문제가 해결되지 않았지만 Basam과 같은 문제가있었습니다.

    CompositeKeyReducer 클래스에서 반복자를 Iterable과 다른 몇 줄의 코드로 대체했습니다.

    public void reduce(TextPair key, Iterable<IntWritable> values,
            Context context) throws IOException, InterruptedException {
    
        int numberofelements = 0;
    
        int cnt = 0;
    
        for (IntWritable value : values) {
            cnt += value.get();
        }
    
        context.write(key, new IntWritable(cnt));
    

    결과 :

  4. from https://stackoverflow.com/questions/37910096/mapreduce-composite-key-sample-doesnt-show-the-desired-output by cc-by-sa and MIT license