[HADOOP] 감속기에서 매퍼 카운터에 액세스
HADOOP감속기에서 매퍼 카운터에 액세스
내 감속재에있는 내 매퍼에서 카운터에 액세스해야합니다. 이것이 가능한가? 그렇다면 어떻게해야할까요?
예로서: 내 매퍼입니다 :
public class CounterMapper extends Mapper<Text,Text,Text,Text> {
static enum TestCounters { TEST }
@Override
protected void map(Text key, Text value, Context context)
throws IOException, InterruptedException {
context.getCounter(TestCounters.TEST).increment(1);
context.write(key, value);
}
}
내 감속기는
public class CounterReducer extends Reducer<Text,Text,Text,LongWritable> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
Counter counter = context.getCounter(CounterMapper.TestCounters.TEST);
long counterValue = counter.getValue();
context.write(key, new LongWritable(counterValue));
}
}
counterValue는 항상 0입니다. 제가 잘못한 일을하고 있습니까? 그렇지 않습니다.
해결법
-
==============================
1.감속기의 configure (JobConf)에서 JobConf 객체를 사용하여 감속기 자신의 작업 ID를 조회 할 수 있습니다. 그러면 감속기가 JobClient (예 : jobtracker에 대한 연결)를 만들고이 작업 (또는 해당 작업에 대한 작업)에 대한 카운터를 쿼리 할 수 있습니다.
감속기의 configure (JobConf)에서 JobConf 객체를 사용하여 감속기 자신의 작업 ID를 조회 할 수 있습니다. 그러면 감속기가 JobClient (예 : jobtracker에 대한 연결)를 만들고이 작업 (또는 해당 작업에 대한 작업)에 대한 카운터를 쿼리 할 수 있습니다.
// in the Reducer class... private long mapperCounter; @Override public void configure(JobConf conf) { JobClient client = new JobClient(conf); RunningJob parentJob = client.getJob(JobID.forName( conf.get("mapred.job.id") )); mapperCounter = parentJob.getCounters().getCounter(MAP_COUNTER_NAME); }
이제 reduce () 메소드 자체에서 mapperCounter를 사용할 수 있습니다.
실제로 여기에서 try-catch가 필요합니다. 이전 API를 사용하고 있지만 새 API를 적용하기가 쉽지 않습니다.
감속기가 시작되기 전에 매퍼의 카운터는 모두 마무리되어야한다는 점에 유의하십시오. 저스틴 토마스의 의견과 달리 정확한 값을 얻어야합니다 (감속기가 같은 카운터를 늘리지 않는 한!).
-
==============================
2.새 API에 대한 Jeff G의 솔루션 구현 :
새 API에 대한 Jeff G의 솔루션 구현 :
@Override public void setup(Context context) throws IOException, InterruptedException{ Configuration conf = context.getConfiguration(); Cluster cluster = new Cluster(conf); Job currentJob = cluster.getJob(context.getJobID()); mapperCounter = currentJob.getCounters().findCounter(COUNTER_NAME).getValue(); }
-
==============================
3.map / reduce의 핵심은 작업을 병렬 처리하는 것입니다. 많은 맵퍼 / 리듀서가있을 것이므로 map / reduce 쌍의 실행을 제외하고는 값이 정확하지 않을 것입니다.
map / reduce의 핵심은 작업을 병렬 처리하는 것입니다. 많은 맵퍼 / 리듀서가있을 것이므로 map / reduce 쌍의 실행을 제외하고는 값이 정확하지 않을 것입니다.
단어 수를 예로들 수 있습니다.
http://wiki.apache.org/hadoop/WordCount
context.write (word, one)를 context.write (line, one)로 변경할 수 있습니다.
-
==============================
4.전역 카운터 값은 각 매퍼 또는 감속기로 다시 브로드 캐스트되지 않습니다. 감속기에 # 매퍼 레코드를 사용할 수있게하려면이 작업을 수행하기 위해 외부 메커니즘을 사용해야합니다.
전역 카운터 값은 각 매퍼 또는 감속기로 다시 브로드 캐스트되지 않습니다. 감속기에 # 매퍼 레코드를 사용할 수있게하려면이 작업을 수행하기 위해 외부 메커니즘을 사용해야합니다.
-
==============================
5.나는이 질문을했지만 내 문제는 해결하지 못했다. 그러나 대체 솔루션이 내 마음에 왔습니다. 매퍼에서 단어 수를 계산하고 매퍼의 끝을 실행하는 정리 함수에서 최소값 키 (이 값이 머리에 있음)로 중간 출력에 쓸 수 있습니다. 감속기에서 단어의 수는 머리에 값을 추가하여 계산됩니다. 샘플 코드와 그 출력물의 일부는 아래에 있습니다.
나는이 질문을했지만 내 문제는 해결하지 못했다. 그러나 대체 솔루션이 내 마음에 왔습니다. 매퍼에서 단어 수를 계산하고 매퍼의 끝을 실행하는 정리 함수에서 최소값 키 (이 값이 머리에 있음)로 중간 출력에 쓸 수 있습니다. 감속기에서 단어의 수는 머리에 값을 추가하여 계산됩니다. 샘플 코드와 그 출력물의 일부는 아래에 있습니다.
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import java.io.IOException; import java.util.StringTokenizer; /** * Created by tolga on 1/26/16. */ public class WordCount { static enum TestCounters { TEST } public static class Map extends Mapper<Object, Text, Text, LongWritable> { private final static LongWritable one = new LongWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); context.write(word, one); context.getCounter(TestCounters.TEST).increment(1); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { context.write(new Text("!"),new LongWritable(context.getCounter(TestCounters.TEST).getValue())); } } public static class Reduce extends Reducer<Text, LongWritable, Text, LongWritable> { public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (LongWritable val : values) { sum += val.get(); } context.write(key, new LongWritable(sum)); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, "WordCount"); job.setJarByClass(WordCount.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } }
텍스트 파일 :
Turgut Özal University is a private university located in Ankara, Turkey. It was established in 2008 by the Turgut Özal Thought and Action Foundation and is named after former Turkish president Turgut Özal.
중간 출력
**! 33 ** 2008 (1) 조치 1 앙카라, 1 재단 1 그것은 1 일을 생각 Turgut 1 Turgut 1 Turgut 1
**! 33 ** 2008 1 액션 1 앙카라, 1 파운데이션 1 그것 1 생각 1 투르 구트 3
-
==============================
6.findCounter (COUNTER_NAME)은 (는) 더 이상 지원되지 않습니다. - https://hadoop.apache.org/docs/r2.7.0/api/org/apache/hadoop/mapred/Counters.html
findCounter (COUNTER_NAME)은 (는) 더 이상 지원되지 않습니다. - https://hadoop.apache.org/docs/r2.7.0/api/org/apache/hadoop/mapred/Counters.html
@Override public void setup(Context context) throws IOException, InterruptedException{ Configuration conf = context.getConfiguration(); Cluster cluster = new Cluster(conf); Job currentJob = cluster.getJob(context.getJobID()); mapperCounter = currentJob.getCounters().findCounter(GROUP_NAME, COUNTER_NAME).getValue(); }
카운터가 호출 될 때 GROUP_NAME이 (가) 지정됩니다. 예 :
context.getCounter("com.example.mycode", "MY_COUNTER").increment(1);
그때
mapperCounter = currentJob.getCounters().findCounter("com.example.mycode", "MY_COUNTER").getValue();
또한 중요한 점은 카운터가 존재하지 않으면 값이 0 인 카운터를 초기화한다는 것입니다.
from https://stackoverflow.com/questions/5450290/accessing-a-mappers-counter-from-a-reducer by cc-by-sa and MIT license
'HADOOP' 카테고리의 다른 글
[HADOOP] 스파크가 강제로 코드를 실행하도록하려면 어떻게해야합니까? (0) | 2019.05.29 |
---|---|
[HADOOP] Sqoop 가져 오기 : 복합 기본 키 및 텍스트 기본 키 (0) | 2019.05.29 |
[HADOOP] 값을 두 번 반복 (MapReduce) (0) | 2019.05.29 |
[HADOOP] hadoop의 MultipleOutputFormat (0) | 2019.05.29 |
[HADOOP] 로컬 Hadoop 2.6 설치에서 S3 / S3n에 어떻게 액세스합니까? (0) | 2019.05.29 |