[HADOOP] Hadoop mapreduce : MapReduce 작업 내에서 매퍼를 연결하는 드라이버
나는 mapreduce 일을 가지고있다 : 내 코드 맵 클래스 :
public static class MapClass extends Mapper<Text, Text, Text, LongWritable> {
public void map(Text key, Text value, Context context)
throws IOException, InterruptedException {
ChainMapper를 사용하고 싶습니다.
1. Job job = new Job(conf, "Job with chained tasks");
2. job.setJarByClass(MapReduce.class);
3. job.setInputFormatClass(TextInputFormat.class);
4. job.setOutputFormatClass(TextOutputFormat.class);
5. FileInputFormat.setInputPaths(job, new Path(InputFile));
6. FileOutputFormat.setOutputPath(job, new Path(OutputFile));
7. JobConf map1 = new JobConf(false);
8. ChainMapper.addMapper(
그 보고서는 8 행에 오류가 있습니다.
1."쿵푸"를 많이 마신 후에 ChainMapper / ChainReducer를 사용할 수있었습니다. 마지막 코멘트 user864846 주셔서 감사합니다.
/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package myPKG; /* * Ajitsen: Sample program for ChainMapper/ChainReducer. This program is modified version of WordCount example available in Hadoop-0.18.0. Added ChainMapper/ChainReducer and made to works in Hadoop 1.0.2. */ import java.io.IOException; import java.util.Iterator; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.*; import org.apache.hadoop.mapred.lib.ChainMapper; import org.apache.hadoop.mapred.lib.ChainReducer; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class ChainWordCount extends Configured implements Tool { public static class Tokenizer extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); System.out.println("Line:"+line); StringTokenizer itr = new StringTokenizer(line); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); output.collect(word, one); } } } public static class UpperCaser extends MapReduceBase implements Mapper<Text, IntWritable, Text, IntWritable> { public void map(Text key, IntWritable value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String word = key.toString().toUpperCase(); System.out.println("Upper Case:"+word); output.collect(new Text(word), value); } } public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } System.out.println("Word:"+key.toString()+"\tCount:"+sum); output.collect(key, new IntWritable(sum)); } } static int printUsage() { System.out.println("wordcount <input> <output>"); ToolRunner.printGenericCommandUsage(System.out); return -1; } public int run(String[] args) throws Exception { JobConf conf = new JobConf(getConf(), ChainWordCount.class); conf.setJobName("wordcount"); if (args.length != 2) { System.out.println("ERROR: Wrong number of parameters: " + args.length + " instead of 2."); return printUsage(); } FileInputFormat.setInputPaths(conf, args[0]); FileOutputFormat.setOutputPath(conf, new Path(args[1])); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); JobConf mapAConf = new JobConf(false); ChainMapper.addMapper(conf, Tokenizer.class, LongWritable.class, Text.class, Text.class, IntWritable.class, true, mapAConf); JobConf mapBConf = new JobConf(false); ChainMapper.addMapper(conf, UpperCaser.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, mapBConf); JobConf reduceConf = new JobConf(false); ChainReducer.setReducer(conf, Reduce.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, reduceConf); JobClient.runJob(conf); return 0; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new ChainWordCount(), args); System.exit(res); } }
최신 버전에서 수정 (적어도 hadoop 2.6부터), addMapper의 true 플래그는 필요하지 않습니다. (실제로 서명에는 변경 억제가 있습니다.).
그래서 그것은 단지
JobConf mapAConf = new JobConf(false); ChainMapper.addMapper(conf, Tokenizer.class, LongWritable.class, Text.class, Text.class, IntWritable.class, mapAConf);
2.JobConf 대신 Configuration을 사용해야합니다. JobConf는 Configuration의 하위 클래스이므로이를위한 생성자가 있어야합니다.
3.실제로 mapper 클래스는 org.apache.hadoop.mapred.Mapper 인터페이스를 구현해야합니다. 나는 똑같은 문제를 가지고 있었지만 이것으로 해결했다.
4.ChainMapper.addMapper ()의 First 인수에 대해 작업 객체를 전달했습니다. 이 함수는 JobConf 유형의 객체를 필요로합니다. 다시 쓰기 :
ChainMapper.addMapper( (JobConf)conf, MapClass.class, Text.class, Text.class, Text.class, Text.class, true, map1 );
문제를 해결해야합니다 ..
from https://stackoverflow.com/questions/6840922/hadoop-mapreduce-driver-for-chaining-mappers-within-a-mapreduce-job by cc-by-sa and MIT license
