[HADOOP] 감속기가 <Text, IntWritable> 및 <Text, IntWritable>을 수신 할 매퍼를 내보내도록 설정하는 방법은 무엇입니까?
HADOOP감속기가 및 을 수신 할 매퍼를 내보내도록 설정하는 방법은 무엇입니까?
두 개의 매퍼와 두 개의 감속기를 사용하는 mapreduce와 함께 hadoop에 몇 가지 코드를 개발 중입니다. 나는 SequenceFileInputFormat과 SequenceFileOutputFormat을 사용하여 첫 번째 감속기의 출력과 두 번째 매퍼의 입력을 함께 작동하도록했다. 문제는 내가 오류를 recibing하고 많은 이유를 모르겠 googleing 후입니다.
오류:
코드:
package casoTaxis;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
public class Eje1{
public static class MapperJob1 extends Mapper<Object, Text, Text, IntWritable> {
//El metodo map recibe un conjunto clave-valor, lo procesa y lo vuelca en un contexto.adasdadada
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
Text hackLicense; IntWritable totalAmount; //salidas
StringTokenizer itr = new StringTokenizer(value.toString(), ",");
itr.nextToken();
hackLicense = new Text(itr.nextToken());
for(int i=2; i<itr.countTokens(); i++) itr.nextToken();
totalAmount = new IntWritable( Integer.parseInt(itr.nextToken()) );
context.write(hackLicense, totalAmount);
}
}
public static class ReducerJob1 extends Reducer<Text, IntWritable, Text, IntWritable> { //No encontre una clase InpuFormat que sea Text, IntWritable
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}
public static class MapperJob2 extends Mapper<Text, IntWritable, Text, IntWritable> {
//El metodo map recibe un conjunto clave-valor, lo procesa y lo vuelca en un contexto.adasdadada
public void map(Text key, IntWritable value, Context context) throws IOException, InterruptedException {
context.write(key, value);
}
}
public static class ReducerJob2 extends Reducer<Text, IntWritable, Text, Text> {
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int max = 0;
for (IntWritable val : values) {
int maxVal = val.get();
if( maxVal>max ) max = maxVal;
}
String licencia = "Conductor con licencia = " + key;
String recaudacion = "Recaudacion = " + max;
context.write(new Text(licencia), new Text(recaudacion));
}
}
public static void main(String[] args) throws Exception {
Configuration conf1 = new Configuration();
Configuration conf2 = new Configuration();
//conf2.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", " ");
Job job1 = Job.getInstance(conf1, "Eje1-Job1");
Job job2 = Job.getInstance(conf2, "Eje1-Job2");
job1.setJarByClass(Eje1.class);
job2.setJarByClass(Eje1.class);
job1.setMapperClass(MapperJob1.class);
job2.setMapperClass(MapperJob2.class);
job1.setReducerClass(ReducerJob1.class);
job2.setReducerClass(ReducerJob2.class);
job1.setMapOutputKeyClass(Text.class);
job1.setMapOutputValueClass(IntWritable.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(IntWritable.class);
job2.setMapOutputKeyClass(Text.class);
job2.setMapOutputKeyClass(IntWritable.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(Text.class);
job1.setOutputFormatClass(SequenceFileOutputFormat.class);
job2.setInputFormatClass(SequenceFileInputFormat.class);///asdasdads
FileInputFormat.addInputPath(job1, new Path(args[0]));
FileOutputFormat.setOutputPath(job1, pathIntermedio);
FileInputFormat.addInputPath(job2, pathIntermedio);
FileOutputFormat.setOutputPath(job2, new Path(args[1]));
job1.waitForCompletion(true);
System.exit(job2.waitForCompletion(true) ? 0 : 1);
}
private static final Path pathIntermedio = new Path("intermediate_output");
}
왜이 오류가 발생합니까? 이것을 달성하는 더 좋은 방법이 있습니까?
해결법
-
==============================
1.라인에 오류가 있습니다.
라인에 오류가 있습니다.
job2.setMapOutputKeyClass(Text.class); job2.setMapOutputKeyClass(IntWritable.class);
그 중 두 번째는 다음과 같아야합니다.
job2.setMapOutputValueClass(IntWritable.class);
from https://stackoverflow.com/questions/40221115/how-to-set-a-a-reducer-to-emmit-text-intwritable-and-a-mapper-to-receive-tex by cc-by-sa and MIT license
'HADOOP' 카테고리의 다른 글
[HADOOP] NoSuchMethodError : org.apache.spark.sql.SQLContext.applySchema (0) | 2019.07.02 |
---|---|
[HADOOP] PySpark : newAPIHadoopFile을 사용하여 여러 줄 레코드 텍스트 파일에서 읽고 매핑하고 줄입니다. (0) | 2019.07.02 |
[HADOOP] HADOOP :: java.lang.ClassNotFoundException : WordCount (0) | 2019.07.02 |
[HADOOP] 하이브로 mysql을 설정 한 후 하이브 메타 스토어 서비스 또는 하이브 - 쉘을 시작할 수 없습니다 (0) | 2019.07.02 |
[HADOOP] 이미지의 하둡 및 3D 렌더링 (0) | 2019.07.02 |