[HADOOP] 전역 변수의 값은 루프에 대해 변경되지 않습니다.
HADOOP전역 변수의 값은 루프에 대해 변경되지 않습니다.
나는 hadoop 프로젝트를 개발 중이다. 나는 특정 일에 고객을 찾고 그 날에 최대 소비량을 가진 사람들을 쓰고 싶다. 내 감속기 클래스에서, 어떤 이유로, 전역 변수 max는 for 루프 후에 값을 변경하지 않습니다.
편집 특정 날짜에 최대 소비량을 가진 고객을 찾고 싶습니다. 나는 원하는 날짜에 고객을 찾을 수 있었지만 내 Reducer 클래스의 문제에 직면하고 있습니다. 다음은 코드입니다.
EDIT # 2 나는 값 (소비)이 자연수라는 것을 이미 알고있다. 그래서 출력 파일에서 나는 최대 소비량을 가진 특정 날의 고객 만이되고 싶다.
편집 # 3 내 입력 파일은 많은 데이터로 구성되어 있습니다. 세 개의 컬럼이 있습니다. 고객 ID, 타임 스탬프 (yyyy-mm-DD HH : mm : ss) 및 소비량
드라이버 클래스
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class alicanteDriver {
public static void main(String[] args) throws Exception {
long t_start = System.currentTimeMillis();
long t_end;
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Alicante");
job.setJarByClass(alicanteDriver.class);
job.setMapperClass(alicanteMapperC.class);
//job.setCombinerClass(alicanteCombiner.class);
job.setPartitionerClass(alicantePartitioner.class);
job.setNumReduceTasks(2);
job.setReducerClass(alicanteReducerC.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path("/alicante_1y.txt"));
FileOutputFormat.setOutputPath(job, new Path("/alicante_output"));
job.waitForCompletion(true);
t_end = System.currentTimeMillis();
System.out.println((t_end-t_start)/1000);
}
}
매퍼 클래스
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class alicanteMapperC extends
Mapper<LongWritable, Text, Text, IntWritable> {
String Customer = new String();
SimpleDateFormat ft = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date t = new Date();
IntWritable Consumption = new IntWritable();
int counter = 0;
// new vars
int max = 0;
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
Date d2 = null;
try {
d2 = ft.parse("2013-07-01 01:00:00");
} catch (ParseException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
if (counter > 0) {
String line = value.toString();
StringTokenizer itr = new StringTokenizer(line, ",");
while (itr.hasMoreTokens()) {
Customer = itr.nextToken();
try {
t = ft.parse(itr.nextToken());
} catch (ParseException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
Consumption.set(Integer.parseInt(itr.nextToken()));
//sort out as many values as possible
if(Consumption.get() > max) {
max = Consumption.get();
}
//find customers in a certain date
if (t.compareTo(d2) == 0 && Consumption.get() == max) {
context.write(new Text(Customer), Consumption);
}
}
}
counter++;
}
}
감속기 클래스
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import com.google.common.collect.Iterables;
public class alicanteReducerC extends
Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int max = 0; //this var
// declaration of Lists
List<Text> l1 = new ArrayList<Text>();
List<IntWritable> l2 = new ArrayList<IntWritable>();
for (IntWritable val : values) {
if (val.get() > max) {
max = val.get();
}
l1.add(key);
l2.add(val);
}
for (int i = 0; i < l1.size(); i++) {
if (l2.get(i).get() == max) {
context.write(key, new IntWritable(max));
}
}
}
}
입력 파일의 일부 값
C11FA586148,2013-07-01 01:00:00,3
C11FA586152,2015-09-01 15:22:22,3
C11FA586168,2015-02-01 15:22:22,1
C11FA586258,2013-07-01 01:00:00,5
C11FA586413,2013-07-01 01:00:00,5
C11UA487446,2013-09-01 15:22:22,3
C11UA487446,2013-07-01 01:00:00,3
C11FA586148,2013-07-01 01:00:00,4
출력이 있어야합니다.
C11FA586258 5
C11FA586413 5
몇 시간 동안 포럼을 검색했지만 여전히 문제를 찾을 수 없습니다. 어떤 아이디어?
해결법
-
==============================
1.리팩토링 된 코드는 다음과 같습니다. 소비 날짜에 대한 특정 값을 전달하거나 변경할 수 있습니다. 이 경우 감속기가 필요하지 않습니다. 내 첫 번째 대답은 입력에서 최대의 comsumption을 쿼리하는 것이 었습니다.이 대답은 사용자가 입력 한 소비량을 쿼리하는 것입니다. setup 메소드는 mapper.maxConsumption.date에 대한 사용자 제공 값을 가져 와서 map 메소드에 전달합니다. 감속기의 클리어 업 방법은 모든 최대 소비 고객을 검사하고 최종 최대 입력 값 (이 경우 5)을 씁니다. 자세한 내용은 스크린 샷을 참조하십시오.
리팩토링 된 코드는 다음과 같습니다. 소비 날짜에 대한 특정 값을 전달하거나 변경할 수 있습니다. 이 경우 감속기가 필요하지 않습니다. 내 첫 번째 대답은 입력에서 최대의 comsumption을 쿼리하는 것이 었습니다.이 대답은 사용자가 입력 한 소비량을 쿼리하는 것입니다. setup 메소드는 mapper.maxConsumption.date에 대한 사용자 제공 값을 가져 와서 map 메소드에 전달합니다. 감속기의 클리어 업 방법은 모든 최대 소비 고객을 검사하고 최종 최대 입력 값 (이 경우 5)을 씁니다. 자세한 내용은 스크린 샷을 참조하십시오.
다음과 같이 실행하십시오 :
hadoop jar maxConsumption.jar -Dmapper.maxConsumption.date="2013-07-01 01:00:00" Data/input.txt output/maxConsupmtion5
#input: C11FA586148,2013-07-01 01:00:00,3 C11FA586152,2015-09-01 15:22:22,3 C11FA586168,2015-02-01 15:22:22,1 C11FA586258,2013-07-01 01:00:00,5 C11FA586413,2013-07-01 01:00:00,5 C11UA487446,2013-09-01 15:22:22,3 C11UA487446,2013-07-01 01:00:00,3 C11FA586148,2013-07-01 01:00:00,4 #output: C11FA586258 5 C11FA586413 5
public class maxConsumption extends Configured implements Tool{ public static class DataMapper extends Mapper<Object, Text, Text, IntWritable> { SimpleDateFormat ft = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date dateInFile, filterDate; int lineno=0; private final static Text customer = new Text(); private final static IntWritable consumption = new IntWritable(); private final static Text maxConsumptionDate = new Text(); public void setup(Context context) { Configuration config = context.getConfiguration(); maxConsumptionDate.set(config.get("mapper.maxConsumption.date")); } public void map(Object key, Text value, Context context) throws IOException, InterruptedException{ try{ lineno++; filterDate = ft.parse(maxConsumptionDate.toString()); //map data from line/file String[] fields = value.toString().split(","); customer.set(fields[0].trim()); dateInFile = ft.parse(fields[1].trim()); consumption.set(Integer.parseInt(fields[2].trim())); if(dateInFile.equals(filterDate)) //only send to reducer if date filter matches.... context.write(new Text(customer), consumption); }catch(Exception e){ System.err.println("Invaid Data at line: " + lineno + " Error: " + e.getMessage()); } } } public static class DataReducer extends Reducer<Text, IntWritable, Text, IntWritable> { LinkedHashMap<String, Integer> maxConsumption = new LinkedHashMap<String,Integer>(); @Override public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int max=0; System.out.print("reducer received: " + key + " [ "); for(IntWritable value: values){ System.out.print( value.get() + " "); if(value.get() > max) max=value.get(); } System.out.println( " ]"); System.out.println(key.toString() + " max is " + max); maxConsumption.put(key.toString(), max); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { int max=0; //first find the max from reducer for (String key : maxConsumption.keySet()){ System.out.println("cleaup customer : " + key.toString() + " consumption : " + maxConsumption.get(key) + " max: " + max); if(maxConsumption.get(key) > max) max=maxConsumption.get(key); } System.out.println("final max is: " + max); //write only the max value from map for (String key : maxConsumption.keySet()){ if(maxConsumption.get(key) == max) context.write(new Text(key), new IntWritable(maxConsumption.get(key))); } } } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new maxConsumption(), args); System.exit(res); } public int run(String[] args) throws Exception { if (args.length != 2) { System.err.println("Usage: -Dmapper.maxConsumption.date=\"2013-07-01 01:00:00\" <in> <out>"); System.exit(2); } Configuration conf = this.getConf(); Job job = Job.getInstance(conf, "get-max-consumption"); job.setJarByClass(maxConsumption.class); job.setMapperClass(DataMapper.class); job.setReducerClass(DataReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); FileSystem fs = null; Path dstFilePath = new Path(args[1]); try { fs = dstFilePath.getFileSystem(conf); if (fs.exists(dstFilePath)) fs.delete(dstFilePath, true); } catch (IOException e1) { e1.printStackTrace(); } return job.waitForCompletion(true) ? 0 : 1; } }
-
==============================
2.아마도 귀하의 감속기로가는 모든 값은 0 미만입니다. 변수가 변하는 지 확인하려면 최소값을 사용해보십시오.
아마도 귀하의 감속기로가는 모든 값은 0 미만입니다. 변수가 변하는 지 확인하려면 최소값을 사용해보십시오.
max = MIN_VALUE;
당신이 말한 것을 기반으로 출력은 오직 0이어야합니다 (이 경우 감속기의 최대 값은 0입니다) 또는 출력이 없습니다 (모든 값은 0보다 작음). 또한 이것을보십시오
context.write(key, new IntWritable());
그것은해야한다
context.write(key, new IntWritable(max));
편집 : 방금 당신의 매퍼 클래스를 보았다, 그것은 많은 문제가있다. 다음 코드는 모든 매퍼에서 첫 번째 요소를 건너 뜁니다. 왜?
if (counter > 0) {
나는 당신이 이처럼 뭔가를 얻고 있다고 생각하니? "customer 2013-07-01 01:00:00, 2, ..."이 경우 값을 이미 필터링하고 있다면 매퍼 범위가 아닌 로컬 변수로 max 변수를 선언해야합니다. 여러 고객.
이 문제에 관해 많은 질문이 있습니다. 각 매퍼 (mapper)에 대한 입력 내용과 원하는 것을 설명 할 수 있습니다.
EDIT2 : 귀하의 대답을 바탕으로 이것을 시도 할 것
import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.StringTokenizer; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class AlicanteMapperC extends Mapper<LongWritable, Text, Text, IntWritable> { private final int max = 5; private SimpleDateFormat ft = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { Date t = null; String[] line = value.toString().split(","); String customer = line[0]; try { t = ft.parse(line[1]); } catch (ParseException e) { // TODO Auto-generated catch block throw new RuntimeException("something wrong with the date!" + line[1]); } Integer consumption = Integer.parseInt(line[2]); //find customers in a certain date if (t.compareTo(ft.parse("2013-07-01 01:00:00")) == 0 && consumption == max) { context.write(new Text(customer), new IntWritable(consumption)); } counter++; } }
그리고 감속기는 고객 당 1 레코드를 방출하기 매우 간단합니다.
import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import com.google.common.collect.Iterables; public class AlicanteReducerC extends Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { //We already now that it is 5 context.write(key, new IntWritable(5)); //If you want something different, for example report customer with different values, you could iterate over the iterator like this //for (IntWritable val : values) { // context.write(key, new IntWritable(val)); //} } }
from https://stackoverflow.com/questions/41635425/global-variables-value-doesnt-change-after-for-loop by cc-by-sa and MIT license
'HADOOP' 카테고리의 다른 글
[HADOOP] 원격 시스템에서 mapreduce 작업을 제출하는 중에 예외가 발생했습니다. (0) | 2019.07.01 |
---|---|
[HADOOP] Spark가 HDFS 데이터를 읽고 동시에 계산을 수행 할 수 있습니까? (0) | 2019.07.01 |
[HADOOP] 스파크 클라이언트 생성 실패 : 스파크 예외 발생시 하이브 (0) | 2019.07.01 |
[HADOOP] 사육사 오류가 발생하는 로컬 파일 시스템에 HBase를 사용 하시겠습니까? (0) | 2019.07.01 |
[HADOOP] Hadoop 클라이언트에서 Hadoop 서버로 연결할 수 없습니다. (0) | 2019.07.01 |