[HADOOP] Mapreduce Combiner
HADOOPMapreduce Combiner
매퍼, 감속기 및 결합기가있는 간단한 mapreduce 코드가 있습니다. 매퍼의 출력은 결합기로 전달됩니다. 그러나 감속기에는 결합기의 출력 대신 매퍼의 출력이 전달됩니다.
친절하게 도와주세요.
암호:
package Combiner;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class AverageSalary
{
public static class Map extends Mapper<LongWritable, Text, Text, DoubleWritable>
{
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
String[] empDetails= value.toString().split(",");
Text unit_key = new Text(empDetails[1]);
DoubleWritable salary_value = new DoubleWritable(Double.parseDouble(empDetails[2]));
context.write(unit_key,salary_value);
}
}
public static class Combiner extends Reducer<Text,DoubleWritable, Text,Text>
{
public void reduce(final Text key, final Iterable<DoubleWritable> values, final Context context)
{
String val;
double sum=0;
int len=0;
while (values.iterator().hasNext())
{
sum+=values.iterator().next().get();
len++;
}
val=String.valueOf(sum)+":"+String.valueOf(len);
try {
context.write(key,new Text(val));
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public static class Reduce extends Reducer<Text,Text, Text,Text>
{
public void reduce (final Text key, final Text values, final Context context)
{
//String[] sumDetails=values.toString().split(":");
//double average;
//average=Double.parseDouble(sumDetails[0]);
try {
context.write(key,values);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public static void main(String args[])
{
Configuration conf = new Configuration();
try
{
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: Main <in> <out>");
System.exit(-1); }
Job job = new Job(conf, "Average salary");
//job.setInputFormatClass(KeyValueTextInputFormat.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
job.setJarByClass(AverageSalary.class);
job.setMapperClass(Map.class);
job.setCombinerClass(Combiner.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true) ? 0 : -1);
} catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
해결법
-
==============================
1.그것은 당신이 결합 자의 중요한 속성을 잊어 버린 것 같습니다 :
그것은 당신이 결합 자의 중요한 속성을 잊어 버린 것 같습니다 :
텍스트 / DoubleWritable을 가져 와서 텍스트 / 텍스트를 반환 할 수 없습니다. DoubleWritable 대신 Text를 사용하고 Combiner에서 적절한 구문 분석을 수행 할 것을 제안합니다.
-
==============================
2.Combiners의 # 1 규칙은 다음과 같습니다. 결합자가 실행될 것이라고 가정하지 마십시오. 결합 자만 최적화로서 다루십시오.
Combiners의 # 1 규칙은 다음과 같습니다. 결합자가 실행될 것이라고 가정하지 마십시오. 결합 자만 최적화로서 다루십시오.
Combiner는 모든 데이터를 실행할 수있는 것은 아닙니다. 데이터가 디스크에 쏟아 질 필요가없는 경우, MapReduce는 결합자를 사용하여 건너 뜁니다. 또한 Combiner는 데이터의 하위 집합에 대해 여러 번 실행될 수 있습니다. 그것은 유출 당 한 번 실행됩니다.
당신의 경우에, 당신은이 나쁜 가정을하고 있습니다. Combiner와 Reducer에서 합계를 계산해야합니다.
또한 @ user987339의 대답도 따라야합니다. 결합기의 입력과 출력은 동일해야합니다 (텍스트, 이중 -> 텍스트, 이중). 매퍼의 출력과 감속기의 입력과 일치해야합니다.
-
==============================
3.combine 함수가 사용되면 reduce 함수와 동일한 형식입니다 (그리고 Reducer의 구현), 출력 유형은 중간 키와 값 유형 (K2 및 V2)이므로 reduce 함수를 제공 할 수 있습니다. 지도 : (K1, V1) → 목록 (K2, V2) 결합 : (K2,리스트 (V2)) →리스트 (K2, V2) reduce : (K2, list (V2)) → list (K3, V3) 종종 결합 및 감소 함수가 동일하며,이 경우 K3은 다음과 같습니다. K2 및 V3은 V2와 동일합니다.
combine 함수가 사용되면 reduce 함수와 동일한 형식입니다 (그리고 Reducer의 구현), 출력 유형은 중간 키와 값 유형 (K2 및 V2)이므로 reduce 함수를 제공 할 수 있습니다. 지도 : (K1, V1) → 목록 (K2, V2) 결합 : (K2,리스트 (V2)) →리스트 (K2, V2) reduce : (K2, list (V2)) → list (K3, V3) 종종 결합 및 감소 함수가 동일하며,이 경우 K3은 다음과 같습니다. K2 및 V3은 V2와 동일합니다.
-
==============================
4.Mapreduce를 실행할 때 Combiner가 항상 작동하지 않습니다.
Mapreduce를 실행할 때 Combiner가 항상 작동하지 않습니다.
최소 3 개의 스필 파일 (로컬 디스크에 작성된 매퍼 출력)이 있으면 combiner가 실행되어 노드 크기를 줄이기 위해 쉽게 전송할 수 있도록 파일 크기를 줄일 수 있습니다.
결합기가 실행해야하는 유출 횟수는 min.num.spills.for.combine 속성을 통해 설정할 수 있습니다
from https://stackoverflow.com/questions/20212884/mapreduce-combiner by cc-by-sa and MIT license
'HADOOP' 카테고리의 다른 글
[HADOOP] hadoop에서 nameservice에 대한 활성 namenode를 가져 오는 명령은 무엇입니까? (0) | 2019.06.14 |
---|---|
[HADOOP] Hadoop mapreduce : MapReduce 작업 내에서 매퍼를 연결하는 드라이버 (0) | 2019.06.14 |
[HADOOP] hadoop hdfs 형식화가 블록 풀에서 오류를 가져 오지 못했습니다. (0) | 2019.06.14 |
[HADOOP] Cygwin을 통한 Windows에서의 Hadoop 설정 (0) | 2019.06.14 |
[HADOOP] Hadoop : start-dfs.sh 권한이 거부되었습니다. (0) | 2019.06.14 |