[HADOOP] Hadoop MapReduce 작업을 실행할 때 Map의 키 / 값 입력으로 Filename / FileData 가져 오기
HADOOPHadoop MapReduce 작업을 실행할 때 Map의 키 / 값 입력으로 Filename / FileData 가져 오기
Hadoop MapReduce 작업을 실행할 때 MAP의 키 / 값 입력으로 파일 이름 / 파일 내용을 가져 오는 방법에 대한 질문을 들었습니까? 이리. 개념을 설명하기는하지만 코드를 성공적으로 변환 할 수는 없습니다.
기본적으로 파일 이름을 키로, 파일 데이터를 값으로 사용합니다. 이를 위해 앞서 언급 한 질문에서 권장하는대로 사용자 지정 RecordReader를 작성했습니다. 그러나이 클래스에서 파일 이름을 키로 가져 오는 방법을 이해할 수 없었습니다. 또한 사용자 정의 FileInputFormat 클래스를 작성하는 동안 이전에 작성한 사용자 정의 RecordReader를 반환하는 방법을 이해할 수 없었습니다.
RecordReader 코드는 다음과 같습니다.
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
public class CustomRecordReader extends RecordReader<Text, Text> {
private static final String LINE_SEPARATOR = System.getProperty("line.separator");
private StringBuffer valueBuffer = new StringBuffer("");
private Text key = new Text();
private Text value = new Text();
private RecordReader<Text, Text> recordReader;
public SPDRecordReader(RecordReader<Text, Text> recordReader) {
this.recordReader = recordReader;
}
@Override
public void close() throws IOException {
recordReader.close();
}
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
return key;
}
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return recordReader.getProgress();
}
@Override
public void initialize(InputSplit arg0, TaskAttemptContext arg1)
throws IOException, InterruptedException {
recordReader.initialize(arg0, arg1);
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (valueBuffer.equals("")) {
while (recordReader.nextKeyValue()) {
valueBuffer.append(recordReader.getCurrentValue());
valueBuffer.append(LINE_SEPARATOR);
}
value.set(valueBuffer.toString());
return true;
}
return false;
}
}
불완전한 FileInputFormat 클래스는 다음과 같습니다.
import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
public class CustomFileInputFormat extends FileInputFormat<Text, Text> {
@Override
protected boolean isSplitable(FileSystem fs, Path filename) {
return false;
}
@Override
public RecordReader<Text, Text> getRecordReader(InputSplit arg0, JobConf arg1,
Reporter arg2) throws IOException {
return null;
}
}
해결법
-
==============================
1.이 코드는 CustomRecordReader 클래스에 있습니다.
이 코드는 CustomRecordReader 클래스에 있습니다.
private LineRecordReader lineReader; private String fileName; public CustomRecordReader(JobConf job, FileSplit split) throws IOException { lineReader = new LineRecordReader(job, split); fileName = split.getPath().getName(); } public boolean next(Text key, Text value) throws IOException { // get the next line if (!lineReader.next(key, value)) { return false; } key.set(fileName); value.set(value); return true; } public Text createKey() { return new Text(""); } public Text createValue() { return new Text(""); }
SPDRecordReader 생성자를 제거하십시오 (이는 오류입니다).
그리고 사용자 정의 FileInputFormat 클래스에이 코드가 있습니다.
public RecordReader<Text, Text> getRecordReader( InputSplit input, JobConf job, Reporter reporter) throws IOException { reporter.setStatus(input.toString()); return new CustomRecordReader(job, (FileSplit)input); }
from https://stackoverflow.com/questions/14212453/getting-filename-filedata-as-key-value-input-for-map-when-running-a-hadoop-mapre by cc-by-sa and MIT license
'HADOOP' 카테고리의 다른 글
[HADOOP] JAVA API를 사용하여 HDFS에서 파일을 이동하거나 복사하는 방법 (0) | 2019.06.11 |
---|---|
[HADOOP] 오류 JA017로 인해 Oozie Workflow가 실패했습니다. (0) | 2019.06.11 |
[HADOOP] hadoop 클러스터에서 id 생성을 처리하는 방법은 무엇입니까? (0) | 2019.06.11 |
[HADOOP] hadoop - 맵 작업 및 정적 변수 감소 (0) | 2019.06.11 |
[HADOOP] 하이브에서 줄 바꿈 문자 처리 (0) | 2019.06.11 |