복붙노트

[HADOOP] Hadoop Map에서 목록 출력 Custom writable을 사용하여 작업 줄이기

HADOOP

Hadoop Map에서 목록 출력 Custom writable을 사용하여 작업 줄이기

난 hadoop 주어진 wordcount 예제를 변경하여 간단한지도 작업을 줄일려고 노력하고있어.

나는 단어의 수 대신 목록을 쓰려고 노력하고있다. wordcount 예제는 다음과 같은 출력을 제공합니다.

hello 2
world 2

나는 그것을 미래의 작업의 토대가 될리스트로 출력하려고 노력 중이다.

hello 1 1
world 1 1

나는 옳은 길에 있다고 생각하지만 목록 작성에 어려움을 겪고 있습니다. 위의 대신에 나는

Hello   foo.MyArrayWritable@61250ff2
World   foo.MyArrayWritable@483a0ab1

여기 MyArrayWritable입니다. 필자는 sys를 write (DataOuptut arg0)에 넣었지만 아무 것도 출력하지 않으므로이 메서드가 호출되지 않을 수도 있고 그 이유를 알 수 없을 것이라고 생각합니다.

class MyArrayWritable extends ArrayWritable{

public MyArrayWritable(Class<? extends Writable> valueClass, Writable[] values) {
    super(valueClass, values);
}
public MyArrayWritable(Class<? extends Writable> valueClass) {
    super(valueClass);
}

@Override
public IntWritable[] get() {
    return (IntWritable[]) super.get();
}

@Override
public void write(DataOutput arg0) throws IOException {
    for(IntWritable i : get()){
        i.write(arg0);
    }
}
}

편집 - 더 많은 소스 코드 추가

public class WordCount {

public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line);
        while (tokenizer.hasMoreTokens()) {
            word.set(tokenizer.nextToken());
            context.write(word, one);
        }
    }
} 

public static class Reduce extends Reducer<Text, IntWritable, Text, MyArrayWritable> {

    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        ArrayList<IntWritable> list = new ArrayList<IntWritable>();    
        for (IntWritable val : values) {
            list.add(val);
        }
        context.write(key, new MyArrayWritable(IntWritable.class, list.toArray(new IntWritable[list.size()])));
    }
}

public static void main(String[] args) throws Exception {
    if(args == null || args.length == 0)
        args = new String[]{"./wordcount/input","./wordcount/output"};
    Path p = new Path(args[1]);
    FileSystem fs = FileSystem.get(new Configuration());
    fs.exists(p);
    fs.delete(p, true);

    Configuration conf = new Configuration();

    Job job = new Job(conf, "wordcount");
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);
    job.setJarByClass(WordCount.class);
    job.setInputFormatClass(TextInputFormat.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    job.waitForCompletion(true);
}

}

해결법

  1. ==============================

    1.감속기에 '버그'가 있습니다. 값 반복기는 루프 전체에서 동일한 IntWritable을 다시 사용하므로 다음과 같이 목록에 추가 할 값을 래핑해야합니다.

    감속기에 '버그'가 있습니다. 값 반복기는 루프 전체에서 동일한 IntWritable을 다시 사용하므로 다음과 같이 목록에 추가 할 값을 래핑해야합니다.

    public void reduce(Text key, Iterable<IntWritable> values, Context context)
                                          throws IOException, InterruptedException {
        ArrayList<IntWritable> list = new ArrayList<IntWritable>();    
        for (IntWritable val : values) {
            list.add(new IntWritable(val));
        }
        context.write(key, new MyArrayWritable(IntWritable.class, list.toArray(new IntWritable[list.size()])));
    }
    

    배열리스트를 사용하면서 맵퍼가 단일 값 (하나) 만 출력하지만 실제로이 코드를 확장하면 문제가 될 수 있습니다.

    또한 작업에서 맵 및 감속기 출력 유형이 서로 다른 것을 정의해야합니다.

    // map output types
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);
    // reducer output types
    
    job.setOutputValueClass(Text.class);
    job.setOutputValueClass(MyArrayWritable.class);
    

    명시 적으로 감속기의 수를 정의하고자 할 수 있습니다 (특히 클러스터 관리자가 기본 x 호를 0으로 정의한 경우, 작업 로그에 sysouts가 기록되는 것을 볼 수없는 이유 일 수 있습니다).

    job.setNumReduceTasks(1);
    

    출력 키 및 값 쌍에서 toString ()을 호출하는 기본 Text 출력 형식을 사용하는 경우 - MyArrayWritable에는 재정의 된 toString 메서드가 없으므로 MyArrayWritable에 값을 입력해야합니다.

    @Override
    public String toString() {
      return Arrays.toString(get());
    }
    

    마지막으로, MyArrayWritable로부터 오버라이드 (override) 된 write 메소드를 삭제합니다. 이것은, readFields 메소드와 호환성이있는 유효한 구현은 아닙니다. 이 메소드를 겹쳐 쓸 필요는 없지만 (sysout이 호출되는지 확인하기를 원할 경우) 대신 다음과 같이하십시오.

    @Override
    public void write(DataOutput arg0) throws IOException {
      System.out.println("write method called");
      super.write(arg0);
    }
    
  2. from https://stackoverflow.com/questions/15810550/output-a-list-from-a-hadoop-map-reduce-job-using-custom-writable by cc-by-sa and MIT license