복붙노트

[HADOOP] Mapper를 Multi thread Mapper로 교체 할 때 키와 키가 일치하지 않는 유형

HADOOP

Mapper를 Multi thread Mapper로 교체 할 때 키와 키가 일치하지 않는 유형

MapReduce 작업을 위해 MultithreadMapper를 구현하고 싶습니다.

이를 위해 작업 코드에서 Mapper를 Multi thread Mapper로 대체했습니다.

내가 얻는 예외는 다음과 같습니다.

java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.IntWritable, recieved org.apache.hadoop.io.LongWritable
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:862)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:549)
at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
at org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper$SubMapRecordWriter.write(MultithreadedMapper.java:211)
at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
at org.apache.hadoop.mapreduce.Mapper.map(Mapper.java:124)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
at org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper$MapRunner.run(MultithreadedMapper.java:264)

코드 설정은 다음과 같습니다.

 public static void main(String[] args) {
    try {
        if (args.length != 2) {
            System.err.println("Usage: MapReduceMain <input path> <output path>");
            System.exit(123);
        }
        Job job = new Job();
        job.setJarByClass(MapReduceMain.class);
        job.setInputFormatClass(TextInputFormat.class);
        FileSystem fs = FileSystem.get(URI.create(args[0]), job.getConfiguration());
        FileStatus[] files = fs.listStatus(new Path(args[0]));
        for(FileStatus sfs:files){
            FileInputFormat.addInputPath(job, sfs.getPath());
        }
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setMapperClass(MyMultithreadMapper.class);
        job.setReducerClass(MyReducer.class);
        MultithreadedMapper.setNumberOfThreads(job, MyMultithreadMapper.nThreads);

        job.setOutputKeyClass(IntWritable.class); 
        job.setOutputValueClass(MyPage.class);

        job.setOutputFormatClass(SequenceFileOutputFormat.class);//write the result as sequential file

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    } catch (Exception e) {
        e.printStackTrace();
    }
}

그리고 매퍼의 코드는 다음과 같습니다.

public class MyMultithreadMapper extends MultithreadedMapper<LongWritable, Text, IntWritable, MyPage> {

ConcurrentLinkedQueue<MyScraper>    scrapers    = new ConcurrentLinkedQueue<MyScraper>();

public static final int             nThreads    = 5;

public MyMultithreadMapper() {
    for (int i = 0; i < nThreads; i++) {
        scrapers.add(new MyScraper());
    }
}

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    MyScraper scraper = scrapers.poll();

    MyPage result = null;
    for (int i = 0; i < 10; i++) {
        try {
            result = scraper.scrapPage(value.toString(), true);
            break;
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    if (result == null) {
        result = new MyPage();
        result.setUrl(key.toString());
    }

    context.write(new IntWritable(result.getUrl().hashCode()), result);

    scrapers.add(scraper);
}

내가 왜 이걸 얻었 니?

해결법

  1. ==============================

    1.수행해야 할 작업은 다음과 같습니다.

    수행해야 할 작업은 다음과 같습니다.

    MultithreadedMapper.setMapperClass (job, MyMapper.class);

    MyMapper는 맵 로직을 구현해야합니다

    MultithreadMapper는 비어 있어야합니다

  2. from https://stackoverflow.com/questions/7563353/type-mismatch-in-key-from-map-when-replacing-mapper-with-multithreadmapper by cc-by-sa and MIT license