[HADOOP] hadoop에서 단일 레코드로 파일 읽기
HADOOPhadoop에서 단일 레코드로 파일 읽기
나는 거대한 번호가있다. 작은 파일의 경우, CombineFileInputFormat을 사용하여 각 파일 데이터가 내 MR 작업의 단일 레코드로 제공되도록 파일을 병합하려고합니다. http://yaseminavcular.blogspot.in/2011/03/many-small-input-files.html을 따라 새로운 API로 변환하려고했습니다.
나는 2 가지 문제에 직면하고있다.
a) 2 개의 작은 파일로 테스트하고 있는데, 여전히 2 개의 매퍼가 실행됩니다. 나는 1을 기대했다.
b) 각 줄은 하나의 레코드로오고, 나는 전체 파일을 하나의 레코드로 원한다.
고통 스러울 지 모르지만 아래 코드를 살펴보십시오. 나는 아직도 순진하다.
드라이버 클래스
public class MRDriver extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
FileSystem fs = new Path(".").getFileSystem(getConf());
fs.printStatistics();
Job job = new Job(getConf());
job.setJobName("Enron MR");
job.setMapperClass(EnronMailReadMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setNumReduceTasks(0);
job.setJarByClass(EnronMailReadMapper.class);
RawCombineFileInputFormat.addInputPath(job, new Path(args[0]));
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true) ? 0 :1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new MRDriver(), args);
System.exit(exitCode);
}
}
아래 클래스는 주로 initialize () 및 nextKeyValue () 함수를 수정 한 LineRecordReader의 복사 붙여 넣기입니다.
public class SingleFileRecordReader extends RecordReader<LongWritable, Text> {
private static final Log LOG = LogFactory.getLog(SingleFileRecordReader.class);
private long start;
private long pos;
private long end;
private LineReader in;
private int maxLineLength;
private LongWritable key = null;
private Text value = null;
public void initialize(InputSplit genericSplit,
TaskAttemptContext context) throws IOException {
FileSplit split = (FileSplit) genericSplit;
Configuration job = context.getConfiguration();
this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
Integer.MAX_VALUE);
start = split.getStart();
end = start + split.getLength();
final Path file = split.getPath();
// open the file and seek to the start of the split
FileSystem fs = file.getFileSystem(job);
FSDataInputStream fileIn = fs.open(split.getPath());
fileIn.seek(start);
in = new LineReader(fileIn, job);
// If this is not the first split, we always throw away first record
// because we always (except the last split) read one extra line in
// next() method.
if (start != 0) {
start += in.readLine(new Text(), 0, maxBytesToConsume(start));
}
this.pos = start;
}
private int maxBytesToConsume(long pos) {
return (int) Math.min(Integer.MAX_VALUE, end - pos);
}
private long getFilePosition() throws IOException {
long retVal= pos;
return retVal;
}
public boolean nextKeyValue() throws IOException {
if (key == null) {
key = new LongWritable();
}
key.set(pos);
if (value == null) {
value = new Text();
}
int newSize = 0;
StringBuffer totalValue = new StringBuffer();
// We always read one extra line, which lies outside the upper
// split limit i.e. (end - 1)
while (getFilePosition() <= end) {
newSize = in.readLine(value, maxLineLength,
Math.max(maxBytesToConsume(pos), maxLineLength));
if (newSize == 0) {
break;
}
totalValue.append(value.toString()+"\n");
pos += newSize;
if (newSize < maxLineLength) {
break;
}
// line too long. try again
LOG.info("Skipped line of size " + newSize + " at pos " +
(pos - newSize));
}
if (newSize == 0) {
key = null;
value = null;
return false;
} else {
value = new Text(totalValue.toString());
return true;
}
}
@Override
public LongWritable getCurrentKey() {
return key;
}
@Override
public Text getCurrentValue() {
return value;
}
/**
* Get the progress within the split
*/
public float getProgress() throws IOException {
if (start == end) {
return 0.0f;
} else {
return Math.min(1.0f,
(getFilePosition() - start) / (float)(end - start));
}
}
public synchronized void close() throws IOException {
try {
if (in != null) {
in.close();
}
} finally {
}
}
}
다른 파일들
public class RawCombineFileInputFormat extends CombineFileInputFormat <LongWritable,Text>{
@Override
public RecordReader<LongWritable, Text> createRecordReader(
InputSplit split, TaskAttemptContext context) throws IOException {
return new CombineFileRecordReader< LongWritable, Text >((CombineFileSplit) split, context, MultiFileRecordReader.class);
}
}
과
public class MultiFileRecordReader extends RecordReader < LongWritable, Text > {
private CombineFileSplit split;
private TaskAttemptContext context;
private int index;
private RecordReader< LongWritable, Text > rr;
public MultiFileRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer index) {
this.split = split;
this.context = context;
this.index = index;
this.rr = new SingleFileRecordReader();
}
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
this.split = (CombineFileSplit) split;
this.context = context;
if (null == rr) {
rr = new SingleFileRecordReader();
}
FileSplit fileSplit = new FileSplit(this.split.getPath(index),
this.split.getOffset(index),
this.split.getLength(index),
this.split.getLocations());
this.rr.initialize(fileSplit, this.context);
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return this.rr.nextKeyValue();
}
@Override
public LongWritable getCurrentKey() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return this.rr.getCurrentKey();
}
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return this.rr.getCurrentValue();
}
@Override
public float getProgress() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return this.rr.getProgress();
}
@Override
public void close() throws IOException {
if (rr != null) {
rr.close();
rr = null;
}
}
}
해결법
-
==============================
1.이 입력 형식을 살펴보십시오. 이것은 단일 맵 작업에서 여러 파일을 읽는 입력 형식입니다. 매퍼에 전달 된 각 레코드는 정확히 하나의 파일 (분할되지 않은 파일)을 읽습니다. WholeFileRecordReader는 하나의 파일 내용을 하나의 값으로 보냅니다. 반환 된 키는 NullWritable이며 value는 각 파일의 전체 내용입니다. 이제 이것을 사용하여 mapreduce 작업을 실행하고 실제로 얼마나 많은 매퍼가 실행되는지 확인하고 출력 결과가 올바른지 확인하십시오.
이 입력 형식을 살펴보십시오. 이것은 단일 맵 작업에서 여러 파일을 읽는 입력 형식입니다. 매퍼에 전달 된 각 레코드는 정확히 하나의 파일 (분할되지 않은 파일)을 읽습니다. WholeFileRecordReader는 하나의 파일 내용을 하나의 값으로 보냅니다. 반환 된 키는 NullWritable이며 value는 각 파일의 전체 내용입니다. 이제 이것을 사용하여 mapreduce 작업을 실행하고 실제로 얼마나 많은 매퍼가 실행되는지 확인하고 출력 결과가 올바른지 확인하십시오.
레코드는 WholeFileRecordReaders에서 생성됩니다.
public class WholeFileInputFormat extends CombineFileInputFormat<NullWritable, Text>{ @Override protected boolean isSplitable(JobContext context, Path file) { return false; } /** * Creates a CombineFileRecordReader to read each file assigned to this InputSplit. * Note, that unlike ordinary InputSplits, split must be a CombineFileSplit, and therefore * is expected to specify multiple files. * * @param split The InputSplit to read. Throws an IllegalArgumentException if this is * not a CombineFileSplit. * @param context The context for this task. * @return a CombineFileRecordReader to process each file in split. * It will read each file with a WholeFileRecordReader. * @throws IOException if there is an error. */ @Override public RecordReader<NullWritable, Text> createRecordReader( InputSplit split, TaskAttemptContext context) throws IOException { if (!(split instanceof CombineFileSplit)) { throw new IllegalArgumentException("split must be a CombineFileSplit"); } return new CombineFileRecordReader<NullWritable, Text>((CombineFileSplit) split, context, WholeFileRecordReader.class); } }
위의 경우 WholeFileRecordReader가 다음과 같이 사용될 수 있습니다.
public class WholeFileRecordReader extends RecordReader<NullWritable, Text> { private static final Logger LOG = Logger.getLogger(WholeFileRecordReader.class); /** The path to the file to read. */ private final Path mFileToRead; /** The length of this file. */ private final long mFileLength; /** The Configuration. */ private final Configuration mConf; /** Whether this FileSplit has been processed. */ private boolean mProcessed; /** Single Text to store the file name of the current file. */ // private final Text mFileName; /** Single Text to store the value of this file (the value) when it is read. */ private final Text mFileText; /** * Implementation detail: This constructor is built to be called via * reflection from within CombineFileRecordReader. * * @param fileSplit The CombineFileSplit that this will read from. * @param context The context for this task. * @param pathToProcess The path index from the CombineFileSplit to process in this record. */ public WholeFileRecordReader(CombineFileSplit fileSplit, TaskAttemptContext context, Integer pathToProcess) { mProcessed = false; mFileToRead = fileSplit.getPath(pathToProcess); mFileLength = fileSplit.getLength(pathToProcess); mConf = context.getConfiguration(); assert 0 == fileSplit.getOffset(pathToProcess); if (LOG.isDebugEnabled()) { LOG.debug("FileToRead is: " + mFileToRead.toString()); LOG.debug("Processing path " + pathToProcess + " out of " + fileSplit.getNumPaths()); try { FileSystem fs = FileSystem.get(mConf); assert fs.getFileStatus(mFileToRead).getLen() == mFileLength; } catch (IOException ioe) { // oh well, I was just testing. } } // mFileName = new Text(); mFileText = new Text(); } /** {@inheritDoc} */ @Override public void close() throws IOException { mFileText.clear(); } /** * Returns the absolute path to the current file. * * @return The absolute path to the current file. * @throws IOException never. * @throws InterruptedException never. */ @Override public NullWritable getCurrentKey() throws IOException, InterruptedException { return NullWritable.get(); } /** * <p>Returns the current value. If the file has been read with a call to NextKeyValue(), * this returns the contents of the file as a BytesWritable. Otherwise, it returns an * empty BytesWritable.</p> * * <p>Throws an IllegalStateException if initialize() is not called first.</p> * * @return A BytesWritable containing the contents of the file to read. * @throws IOException never. * @throws InterruptedException never. */ @Override public Text getCurrentValue() throws IOException, InterruptedException { return mFileText; } /** * Returns whether the file has been processed or not. Since only one record * will be generated for a file, progress will be 0.0 if it has not been processed, * and 1.0 if it has. * * @return 0.0 if the file has not been processed. 1.0 if it has. * @throws IOException never. * @throws InterruptedException never. */ @Override public float getProgress() throws IOException, InterruptedException { return (mProcessed) ? (float) 1.0 : (float) 0.0; } /** * All of the internal state is already set on instantiation. This is a no-op. * * @param split The InputSplit to read. Unused. * @param context The context for this task. Unused. * @throws IOException never. * @throws InterruptedException never. */ @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { // no-op. } /** * <p>If the file has not already been read, this reads it into memory, so that a call * to getCurrentValue() will return the entire contents of this file as Text, * and getCurrentKey() will return the qualified path to this file as Text. Then, returns * true. If it has already been read, then returns false without updating any internal state.</p> * * @return Whether the file was read or not. * @throws IOException if there is an error reading the file. * @throws InterruptedException if there is an error. */ @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (!mProcessed) { if (mFileLength > (long) Integer.MAX_VALUE) { throw new IOException("File is longer than Integer.MAX_VALUE."); } byte[] contents = new byte[(int) mFileLength]; FileSystem fs = mFileToRead.getFileSystem(mConf); FSDataInputStream in = null; try { // Set the contents of this file. in = fs.open(mFileToRead); IOUtils.readFully(in, contents, 0, contents.length); mFileText.set(contents, 0, contents.length); } finally { IOUtils.closeStream(in); } mProcessed = true; return true; } return false; } }
다음은 귀하의 드라이버 코드입니다 : -
public int run(String[] arg) throws Exception { Configuration conf=getConf(); FileSystem fs = FileSystem.get(conf); //estimate reducers Job job = new Job(conf); job.setJarByClass(WholeFileDriver.class); job.setJobName("WholeFile"); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(WholeFileInputFormat.class); job.setMapperClass(WholeFileMapper.class); job.setNumReduceTasks(0); FileInputFormat.addInputPath(job, new Path(arg[0])); Path output=new Path(arg[1]); try { fs.delete(output, true); } catch (IOException e) { LOG.warn("Failed to delete temporary path", e); } FileOutputFormat.setOutputPath(job, output); boolean ret=job.waitForCompletion(true); if(!ret){ throw new Exception("Job Failed"); }
from https://stackoverflow.com/questions/17875277/reading-file-as-single-record-in-hadoop by cc-by-sa and MIT license
'HADOOP' 카테고리의 다른 글
[HADOOP] 하이브의 분할 된 테이블에 대해 얼마나 많은 매퍼와 축소 기가 만들어 지는지 (0) | 2019.05.29 |
---|---|
[HADOOP] 병렬로 IN 절 매개 변수를 사용하여 하이브 쿼리를 실행합니다. (0) | 2019.05.29 |
[HADOOP] Java webapp에 내장 된 클라이언트에 대해 hadoop 시스템 사용자 설정 (0) | 2019.05.29 |
[HADOOP] Hadoop : HDFS에서 파일을 압축 하시겠습니까? (0) | 2019.05.29 |
[HADOOP] hdfs에서 권한이 거부되었습니다. (0) | 2019.05.29 |