[HADOOP] Hadoop 다중 입력
HADOOPHadoop 다중 입력
hadoop map reduce를 사용하고 있으며 두 개의 파일을 계산하려고합니다. 첫 번째 Map / Reduce 반복은 다음과 같이 쌍 ID 번호가있는 파일을 제공합니다.
A 30
D 20
내 목표는 파일에서 ID를 사용하여 다른 파일과 연결하고 다음과 같이 ID, 숫자, 이름과 같은 다른 출력을 트리오로 사용하는 것입니다.
A ABC 30
D EFGH 20
그러나 Map Reduce를 사용하는 것이 최선의 방법인지 확실하지 않습니다. 예를 들어 파일 판독기를 사용하여 두 번째 입력 파일을 읽고 ID로 이름을 얻는 것이 더 낫지 않습니까? Map Reduce로 할 수 있습니까?
그렇다면, 나는 어떻게 발견하려고 노력하고있다. 나는 MultipleInput 솔루션을 시도했다 :
MultipleInputs.addInputPath(job2, new Path(args[1]+"-tmp"),
TextInputFormat.class, FlightsByCarrierMapper2.class);
MultipleInputs.addInputPath(job2, new Path("inputplanes"),
TextInputFormat.class, FlightsModeMapper.class);
그러나 두 가지를 결합하여 원하는 출력을 얻는 방법은 생각할 수 없습니다. 지금 당장 가지고있는 방법은이 예제와 같은 목록을 나에게 줄뿐입니다.
A ABC
A 30
B ABCD
C ABCDEF
D EFGH
D 20
나의 마지막 감속 후에 나는 이것을 얻고있다 :
N125DL 767-332
N125DL 7 ,
N126AT 737-76N
N126AT 19 ,
N126DL 767-332
N126DL 1 ,
N127DL 767-332
N127DL 7 ,
N128DL 767-332
N128DL 3
나는 이것을 원한다 : N127DL 7 767-332. 또한, 나는 결합하지 않는 것을 원하지 않습니다.
그리고 이것은 제 수업입니다.
공용 클래스 FlightsByCarrierReducer2는 감속기를 확장합니다 {
String merge = "";
protected void reduce(Text token, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
int i = 0;
for(Text value:values)
{
if(i == 0){
merge = value.toString()+",";
}
else{
merge += value.toString();
}
i++;
}
context.write(token, new Text(merge));
}
}
최신 정보:
http://stat-computing.org/dataexpo/2009/the-data.html 이것이 제가 사용하고있는 예입니다.
나는 노력하고있다 : TailNum과 Cancelled는 (1 또는 0) TailNum에 해당하는 모델 이름을 얻는다. 모델이있는 파일에는 TailNumb, Model 및 기타 항목이 있습니다. 내 현재 출력 :
N193JB ERJ 190-100 IGW
N194DN 767-332
N19503 EMB-135ER
N19554 EMB-145LR
N195DN 767-332
N195DN 2
먼저 열쇠가오고, 두 번째 모델은 비행이 취소 된 열쇠이며 모델 아래에 나타납니다.
그리고 모델 당 취소 수를 원하기 때문에 취소 된 모델 번호, 트리오 키를 원합니다.
해결법
-
==============================
1.두 맵퍼의 키로 ID를 사용하여 이들을 조인 할 수 있습니다. 지도 작업을 다음과 같이 작성할 수 있습니다.
두 맵퍼의 키로 ID를 사용하여 이들을 조인 할 수 있습니다. 지도 작업을 다음과 같이 작성할 수 있습니다.
public void map(LongWritable k, Text value, Context context) throws IOException, InterruptedException { //Get the line //split the line to get ID seperate //word1 = A //word2 = 30 //Likewise for A ABC //word1 = A //word2 = ABC context.write(word1, word2); }
나는 당신이 같은지도 작업을 취소 할 수 있다고 생각합니다. 그런 다음 Hadoop Framework가 핵심 기반으로 데이터를 그룹화하는 Reducer 작업을 작성하십시오. 그러면 ID를 키로 사용할 수 있습니다. 그리고 값 중 하나를 캐쉬 할 수 있습니다.
String merge = ""; public void reduce(Text key, Iterable<Text> values, Context context) { int i =0; for(Text value:values) { if(i == 0){ merge = value.toString()+","; } else{ merge += value.toString(); } i++; } valEmit.set(merge); context.write(key, valEmit); }
마지막으로 Driver 클래스를 작성할 수 있습니다.
public int run(String[] args) throws Exception { Configuration c=new Configuration(); String[] files=new GenericOptionsParser(c,args).getRemainingArgs(); Path p1=new Path(files[0]); Path p2=new Path(files[1]); Path p3=new Path(files[2]); FileSystem fs = FileSystem.get(c); if(fs.exists(p3)){ fs.delete(p3, true); } Job job = new Job(c,"Multiple Job"); job.setJarByClass(MultipleFiles.class); MultipleInputs.addInputPath(job, p1, TextInputFormat.class, MultipleMap1.class); MultipleInputs.addInputPath(job,p2, TextInputFormat.class, MultipleMap2.class); job.setReducerClass(MultipleReducer.class); . . }
여기에서 예제를 찾을 수 있습니다.
희망이 도움이됩니다.
최신 정보
입력 1
A 30 D 20
입력 2
A ABC D EFGH
산출
A ABC 30 D EFGH 20
Mapper.java
import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /** * @author sreeveni * */ public class Mapper1 extends Mapper<LongWritable, Text, Text, Text> { Text keyEmit = new Text(); Text valEmit = new Text(); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String parts[] = line.split(" "); keyEmit.set(parts[0]); valEmit.set(parts[1]); context.write(keyEmit, valEmit); } }
Reducer.java
import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; /** * @author sreeveni * */ public class ReducerJoin extends Reducer<Text, Text, Text, Text> { Text valEmit = new Text(); String merge = ""; public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String character = ""; String number = ""; for (Text value : values) { // ordering output String val = value.toString(); char myChar = val.charAt(0); if (Character.isDigit(myChar)) { number = val; } else { character = val; } } merge = character + " " + number; valEmit.set(merge); context.write(key, valEmit); } }
드라이버 클래스
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * @author sreeveni * */ public class Driver extends Configured implements Tool { public static void main(String[] args) throws Exception { // TODO Auto-generated method stub // checking the arguments count if (args.length != 3) { System.err .println("Usage : <inputlocation> <inputlocation> <outputlocation> "); System.exit(0); } int res = ToolRunner.run(new Configuration(), new Driver(), args); System.exit(res); } @Override public int run(String[] args) throws Exception { // TODO Auto-generated method stub String source1 = args[0]; String source2 = args[1]; String dest = args[2]; Configuration conf = new Configuration(); conf.set("mapred.textoutputformat.separator", " "); // changing default // delimiter to user // input delimiter FileSystem fs = FileSystem.get(conf); Job job = new Job(conf, "Multiple Jobs"); job.setJarByClass(Driver.class); Path p1 = new Path(source1); Path p2 = new Path(source2); Path out = new Path(dest); MultipleInputs.addInputPath(job, p1, TextInputFormat.class, Mapper1.class); MultipleInputs.addInputPath(job, p2, TextInputFormat.class, Mapper1.class); job.setReducerClass(ReducerJoin.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setOutputFormatClass(TextOutputFormat.class); /* * delete if exist */ if (fs.exists(out)) fs.delete(out, true); TextOutputFormat.setOutputPath(job, out); boolean success = job.waitForCompletion(true); return success ? 0 : 1; } }
-
==============================
2.귀하의 감속기는지도 방법을 가지고 있지만, 합병하는 값의 반복 가능한 집합을 취하는 reduce 방법을 가져야합니다. reduce () 메서드가 없기 때문에 모든 키 / 값 쌍을 통과하는 기본 비헤이비어를 얻습니다.
귀하의 감속기는지도 방법을 가지고 있지만, 합병하는 값의 반복 가능한 집합을 취하는 reduce 방법을 가져야합니다. reduce () 메서드가 없기 때문에 모든 키 / 값 쌍을 통과하는 기본 비헤이비어를 얻습니다.
from https://stackoverflow.com/questions/27349743/hadoop-multiple-inputs by cc-by-sa and MIT license
'HADOOP' 카테고리의 다른 글
[HADOOP] 3 번의 재시도 후 ZooKeeper가 실패했습니다. (0) | 2019.06.23 |
---|---|
[HADOOP] 하이브, 어떻게 모든 데이터베이스의 테이블 열을 검색합니까? (0) | 2019.06.23 |
[HADOOP] Hive 1.1.0 테이블 파티션 유형을 int에서 string으로 변경합니다. (0) | 2019.06.23 |
[HADOOP] Dataproc 클러스터 시작시 파이썬 라이브러리를 자동으로 설치하려면 어떻게해야합니까? (0) | 2019.06.23 |
[HADOOP] Hadoop Streaming을 LZO 압축 시퀀스 파일과 함께 사용하는 방법? (0) | 2019.06.23 |