[HADOOP] Hadoop은 Datajoin을 사용하여 측면 조인을 줄입니다.
HADOOPHadoop은 Datajoin을 사용하여 측면 조인을 줄입니다.
축소 측 결합을 수행하려면 다음 코드를 사용하고 있습니다.
/*
* HadoopMapper.java
*
* Created on Apr 8, 2012, 5:39:51 PM
*/
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
// import org.apache.commons.logging.Log;
// import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.contrib.utils.join.*;
/**
*
* @author
*/
public class DataJoin extends Configured implements Tool
{
public static class MapClass extends DataJoinMapperBase
{
protected Text generateInputTag(String inputFile)
{
String datasource = inputFile.split("-")[0];
return new Text(datasource);
}
protected Text generateGroupKey(TaggedMapOutput aRecord)
{
String line = ((Text) aRecord.getData()).toString();
String[] tokens = line.split(",");
String groupKey = tokens[0];
return new Text(groupKey);
}
protected TaggedMapOutput generateTaggedMapOutput(Object value)
{
TaggedWritable retv = new TaggedWritable((Text) value);
retv.setTag(this.inputTag);
return retv;
}
}
public static class Reduce extends DataJoinReducerBase
{
protected TaggedMapOutput combine(Object[] tags, Object[] values)
{
if (tags.length < 2) return null;
String joinedStr = "";
for (int i=0; i<values.length; i++)
{
if (i > 0) joinedStr += ",";
TaggedWritable tw = (TaggedWritable) values[i];
String line = ((Text) tw.getData()).toString();
String[] tokens = line.split(",", 2);
joinedStr += tokens[1];
}
TaggedWritable retv = new TaggedWritable(new Text(joinedStr));
retv.setTag((Text) tags[0]);
return retv;
}
}
public static class TaggedWritable extends TaggedMapOutput
{
private Writable data;
public TaggedWritable(Writable data)
{
this.tag = new Text("");
this.data = data;
}
public Writable getData()
{
return data;
}
public void write(DataOutput out) throws IOException
{
this.tag.write(out);
this.data.write(out);
}
public void readFields(DataInput in) throws IOException
{
this.tag.readFields(in);
this.data.readFields(in);
}
}
public int run(String[] args) throws Exception
{
Configuration conf = getConf();
JobConf job = new JobConf(conf, DataJoin.class);
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2)
{
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Path in = new Path(args[0]);
Path out = new Path(args[1]);
FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out);
job.setJobName("DataJoin");
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);
job.setInputFormat(TextInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(TaggedWritable.class);
job.set("mapred.textoutputformat.separator", ",");
JobClient.runJob(job);
return 0;
}
public static void main(String[] args) throws Exception
{
int res = ToolRunner.run(new Configuration(),
new DataJoin(),
args);
System.exit(res);
}
}
내 코드를 컴파일 할 수있다. 내가 hadoop에서 실행할 때 나는 결합 자로 다음 에러를 얻고있다.
12/04/17 19:59:29 INFO mapred.JobClient: map 100% reduce 27%
12/04/17 19:59:38 INFO mapred.JobClient: map 100% reduce 30%
12/04/17 19:59:47 INFO mapred.JobClient: map 100% reduce 33%
12/04/17 20:00:23 INFO mapred.JobClient: Task Id : attempt_201204061316_0018_r_000000_2, Status : FAILED
java.lang.RuntimeException: java.lang.NoSuchMethodException: DataJoin$TaggedWritable.<init>()
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:115)
at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:62)
at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:40)
at org.apache.hadoop.mapred.Task$ValuesIterator.readNextValue(Task.java:1136)
at org.apache.hadoop.mapred.Task$ValuesIterator.next(Task.java:1076)
at org.apache.hadoop.mapred.ReduceTask$ReduceValuesIterator.moveToNext(ReduceTask.java:246)
at org.apache.hadoop.mapred.ReduceTask$ReduceValuesIterator.next(ReduceTask.java:242)
at org.apache.hadoop.contrib.utils.join.DataJoinReducerBase.regroup(DataJoinReducerBase.java:106)
내가 hadoop을 실행하기 위해 사용하는 명령은 / hadoop / core / bin / hadoop jar /export/scratch/lopez/Join/DataJoin.jar DataJoin / export / scratch / user / lopez / Join / export / scratch / user / lopez / Join_Output
데이터 Join.jar 파일에는 Data Join $ Tagged Writable 패키지가 있습니다.
일부 포럼을 확인한 결과 비 정적 클래스로 인해 오류가 발생할 수 있음을 알게되었습니다. 내 프로그램에는 정적이 아닌 클래스가 없습니다!
누군가 나를 도울 수 있을까?
크리스 당신이 말한대로 편집 해 줘서 고마워. 두 개의 파일을 가져 오기 위해 코드를 업데이트했습니다. 하지만 같은 오류 메시지가 나타납니다.
나는 같은 메시지를 얻고있다. INFO mapred.FileInputFormat : 처리 할 총 입력 경로 : 2
오류는
Status : FAILED
java.lang.ArrayIndexOutOfBoundsException: 1
at DataJoin$Reduce.combine(DataJoin.java:69)
at org.apache.hadoop.contrib.utils.join.DataJoinReducerBase.joinAndCollect(DataJoinReducerBase.java:205)
at org.apache.hadoop.contrib.utils.join.DataJoinReducerBase.joinAndCollect(DataJoinReducerBase.java:214)
at org.apache.hadoop.contrib.utils.join.DataJoinReducerBase.joinAndCollect(DataJoinReducerBase.java:214)
at org.apache.hadoop.contrib.utils.join.DataJoinReducerBase.joinAndCollect(DataJoinReducerBase.java:181)
at org.apache.hadoop.contrib.utils.join.DataJoinReducerBase.reduce(DataJoinReducerBase.java:135)
at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:468)
{
Configuration conf = getConf();
JobConf job = new JobConf(conf, DataJoin.class);
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 3)
{
System.err.println("Usage: wordcount <in> <in1> <out>");
System.exit(2);
}
Path in = new Path(args[0]);
Path in1 = new Path(args[1]);
Path out = new Path(args[2]);
FileInputFormat.setInputPaths(job,in,in1);
FileOutputFormat.setOutputPath(job, out);
job.setJobName("DataJoin");
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);
job.setInputFormat(TextInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(TaggedWritable.class);
job.set("mapred.textoutputformat.separator", ",");
JobClient.runJob(job);
return 0;
}
해결법
-
==============================
1.TaggedWritable에 대한 기본 생성자가 필요합니다 (Hadoop은 리플렉션을 사용하여이 객체를 만들고 기본 생성자 (args 없음)가 필요합니다.
TaggedWritable에 대한 기본 생성자가 필요합니다 (Hadoop은 리플렉션을 사용하여이 객체를 만들고 기본 생성자 (args 없음)가 필요합니다.
또한 readFields 메서드에서 쓰기 가능한 인터페이스에서 data.readFields (in)를 호출했지만 데이터의 실제 런타임 클래스에 대해 알지 못하는 문제가 있습니다.
데이터 객체 자체를 출력하기 전에 데이터 클래스 이름을 작성하거나 GenericWritable 클래스를 살펴볼 것을 권장합니다 (사용할 수있는 쓰기 가능한 허용 가능한 클래스 집합을 정의하기 위해이 클래스를 확장해야합니다).
그래서 당신은 다음과 같이 수정할 수 있습니다 :
public static class TaggedWritable extends TaggedMapOutput { private Writable data; public TaggedWritable() { this.tag = new Text(); } public TaggedWritable(Writable data) { this.tag = new Text(""); this.data = data; } public Writable getData() { return data; } public void setData(Writable data) { this.data = data; } public void write(DataOutput out) throws IOException { this.tag.write(out); out.writeUTF(this.data.getClass().getName()); this.data.write(out); } public void readFields(DataInput in) throws IOException { this.tag.readFields(in); String dataClz = in.readUTF(); if (this.data == null || !this.data.getClass().getName().equals(dataClz)) { this.data = (Writable) ReflectionUtils.newInstance( Class.forName(dataClz), null); } this.data.readFields(in); } }
-
==============================
2.코드는 Text로만 작업하기 때문에 TaggedWritable에 기본 생성자를 연결하면됩니다.
코드는 Text로만 작업하기 때문에 TaggedWritable에 기본 생성자를 연결하면됩니다.
public TaggedWritable() { this(new Text("")); }
from https://stackoverflow.com/questions/10201500/hadoop-reduce-side-join-using-datajoin by cc-by-sa and MIT license
'HADOOP' 카테고리의 다른 글
[HADOOP] Spark SQL saveAsTable이 빈 결과를 반환합니다. (0) | 2019.06.12 |
---|---|
[HADOOP] Hive에서 datetime에 분을 추가하십시오. (0) | 2019.06.12 |
[HADOOP] org.apache.hadoop.conf.Configuration은 hadoop-core.jar에 존재하지 않습니다. (0) | 2019.06.12 |
[HADOOP] Hadoop ClassNotFoundException (0) | 2019.06.11 |
[HADOOP] Spark 작업으로 제출 될 때 Spark RDD 맵의 NullPointerException (0) | 2019.06.11 |