복붙노트

[HADOOP] hadoop에서 단일 레코드로 파일 읽기

HADOOP

hadoop에서 단일 레코드로 파일 읽기

나는 거대한 번호가있다. 작은 파일의 경우, CombineFileInputFormat을 사용하여 각 파일 데이터가 내 MR 작업의 단일 레코드로 제공되도록 파일을 병합하려고합니다. http://yaseminavcular.blogspot.in/2011/03/many-small-input-files.html을 따라 새로운 API로 변환하려고했습니다.

나는 2 가지 문제에 직면하고있다.

a) 2 개의 작은 파일로 테스트하고 있는데, 여전히 2 개의 매퍼가 실행됩니다. 나는 1을 기대했다.

b) 각 줄은 하나의 레코드로오고, 나는 전체 파일을 하나의 레코드로 원한다.

고통 스러울 지 모르지만 아래 코드를 살펴보십시오. 나는 아직도 순진하다.

드라이버 클래스

public class MRDriver  extends Configured implements Tool {


@Override
public int run(String[] args) throws Exception {
    FileSystem fs = new Path(".").getFileSystem(getConf());
    fs.printStatistics();
    Job job = new Job(getConf());
    job.setJobName("Enron MR");
    job.setMapperClass(EnronMailReadMapper.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    job.setNumReduceTasks(0);
    job.setJarByClass(EnronMailReadMapper.class);
    RawCombineFileInputFormat.addInputPath(job, new Path(args[0]));
    job.setOutputFormatClass(TextOutputFormat.class);
    TextOutputFormat.setOutputPath(job, new Path(args[1]));
    return job.waitForCompletion(true) ? 0 :1;  
}

public static void main(String[] args) throws Exception  {
    int exitCode = ToolRunner.run(new MRDriver(), args);
    System.exit(exitCode);
}

}

아래 클래스는 주로 initialize () 및 nextKeyValue () 함수를 수정 한 LineRecordReader의 복사 붙여 넣기입니다.

public class SingleFileRecordReader extends RecordReader<LongWritable, Text> {
  private static final Log LOG = LogFactory.getLog(SingleFileRecordReader.class);

  private long start;
  private long pos;
  private long end;
  private LineReader in;
  private int maxLineLength;
  private LongWritable key = null;
  private Text value = null;

  public void initialize(InputSplit genericSplit,
                         TaskAttemptContext context) throws IOException {
    FileSplit split = (FileSplit) genericSplit;
    Configuration job = context.getConfiguration();
    this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
                                    Integer.MAX_VALUE);
    start = split.getStart();
    end = start + split.getLength();
    final Path file = split.getPath();

    // open the file and seek to the start of the split
    FileSystem fs = file.getFileSystem(job);
    FSDataInputStream fileIn = fs.open(split.getPath());

      fileIn.seek(start);
      in = new LineReader(fileIn, job);
    // If this is not the first split, we always throw away first record
    // because we always (except the last split) read one extra line in
    // next() method.
    if (start != 0) {
      start += in.readLine(new Text(), 0, maxBytesToConsume(start));
    }
    this.pos = start;
  }

  private int maxBytesToConsume(long pos) {
    return (int) Math.min(Integer.MAX_VALUE, end - pos);
  }

  private long getFilePosition() throws IOException {
    long retVal= pos;
    return retVal;
  }

  public boolean nextKeyValue() throws IOException {
    if (key == null) {
      key = new LongWritable();
    }
    key.set(pos);
    if (value == null) {
      value = new Text();
    }
    int newSize = 0;
    StringBuffer totalValue = new StringBuffer();
    // We always read one extra line, which lies outside the upper
    // split limit i.e. (end - 1)
    while (getFilePosition() <= end) {
      newSize = in.readLine(value, maxLineLength,
          Math.max(maxBytesToConsume(pos), maxLineLength));
      if (newSize == 0) {
        break;
      }
      totalValue.append(value.toString()+"\n");
      pos += newSize;
      if (newSize < maxLineLength) {
        break;
      }

      // line too long. try again
      LOG.info("Skipped line of size " + newSize + " at pos " + 
               (pos - newSize));
    }
    if (newSize == 0) {
      key = null;
      value = null;
      return false;
    } else {
        value = new Text(totalValue.toString());
      return true;
    }
  }

  @Override
  public LongWritable getCurrentKey() {
    return key;
  }

  @Override
  public Text getCurrentValue() {
    return value;
  }

  /**
   * Get the progress within the split
   */
  public float getProgress() throws IOException {
    if (start == end) {
      return 0.0f;
    } else {
      return Math.min(1.0f,
        (getFilePosition() - start) / (float)(end - start));
    }
  }

  public synchronized void close() throws IOException {
    try {
      if (in != null) {
        in.close();
      }
    } finally {

    }
  }

}

다른 파일들

public class RawCombineFileInputFormat extends CombineFileInputFormat <LongWritable,Text>{

@Override
public RecordReader<LongWritable, Text> createRecordReader(
        InputSplit split, TaskAttemptContext context) throws IOException {
    return new CombineFileRecordReader< LongWritable, Text >((CombineFileSplit) split, context, MultiFileRecordReader.class);
}

}

public class MultiFileRecordReader extends RecordReader < LongWritable, Text > {

 private CombineFileSplit split;
 private TaskAttemptContext context;
 private int index;
 private RecordReader< LongWritable, Text > rr;

public MultiFileRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer index) {
    this.split = split;
    this.context = context;
    this.index = index;
    this.rr = new SingleFileRecordReader();
}
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
        throws IOException, InterruptedException {
    this.split = (CombineFileSplit) split;
      this.context = context;

      if (null == rr) {
       rr = new SingleFileRecordReader();
      }

      FileSplit fileSplit = new FileSplit(this.split.getPath(index), 
                                          this.split.getOffset(index), 
                                          this.split.getLength(index), 
                                          this.split.getLocations());
      this.rr.initialize(fileSplit, this.context);

}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
    // TODO Auto-generated method stub
    return this.rr.nextKeyValue();
}

@Override
public LongWritable getCurrentKey() throws IOException, InterruptedException {
    // TODO Auto-generated method stub
    return this.rr.getCurrentKey();
}

@Override
public Text getCurrentValue() throws IOException, InterruptedException {
    // TODO Auto-generated method stub
    return this.rr.getCurrentValue();
}

@Override
public float getProgress() throws IOException, InterruptedException {
    // TODO Auto-generated method stub
    return this.rr.getProgress();
}

@Override
public void close() throws IOException {
    if (rr != null) {
           rr.close();
           rr = null;
    }       
}   

}

해결법

  1. ==============================

    1.이 입력 형식을 살펴보십시오. 이것은 단일 맵 작업에서 여러 파일을 읽는 입력 형식입니다. 매퍼에 전달 된 각 레코드는 정확히 하나의 파일 (분할되지 않은 파일)을 읽습니다. WholeFileRecordReader는 하나의 파일 내용을 하나의 값으로 보냅니다. 반환 된 키는 NullWritable이며 value는 각 파일의 전체 내용입니다. 이제 이것을 사용하여 mapreduce 작업을 실행하고 실제로 얼마나 많은 매퍼가 실행되는지 확인하고 출력 결과가 올바른지 확인하십시오.

    이 입력 형식을 살펴보십시오. 이것은 단일 맵 작업에서 여러 파일을 읽는 입력 형식입니다. 매퍼에 전달 된 각 레코드는 정확히 하나의 파일 (분할되지 않은 파일)을 읽습니다. WholeFileRecordReader는 하나의 파일 내용을 하나의 값으로 보냅니다. 반환 된 키는 NullWritable이며 value는 각 파일의 전체 내용입니다. 이제 이것을 사용하여 mapreduce 작업을 실행하고 실제로 얼마나 많은 매퍼가 실행되는지 확인하고 출력 결과가 올바른지 확인하십시오.

    레코드는 WholeFileRecordReaders에서 생성됩니다.

        public class WholeFileInputFormat extends CombineFileInputFormat<NullWritable, Text>{
    
            @Override
            protected boolean isSplitable(JobContext context, Path file) {
                return false;
            }
    
    /**
       * Creates a CombineFileRecordReader to read each file assigned to this InputSplit.
       * Note, that unlike ordinary InputSplits, split must be a CombineFileSplit, and therefore
       * is expected to specify multiple files.
       *
       * @param split The InputSplit to read.  Throws an IllegalArgumentException if this is
       *        not a CombineFileSplit.
       * @param context The context for this task.
       * @return a CombineFileRecordReader to process each file in split.
       *         It will read each file with a WholeFileRecordReader.
       * @throws IOException if there is an error.
       */
    
        @Override
        public RecordReader<NullWritable, Text> createRecordReader(
                InputSplit split, TaskAttemptContext context) throws IOException {
    
            if (!(split instanceof CombineFileSplit)) {
                  throw new IllegalArgumentException("split must be a CombineFileSplit");
                }
                return new CombineFileRecordReader<NullWritable, Text>((CombineFileSplit) split, context, WholeFileRecordReader.class);
        }
    
        }
    

    위의 경우 WholeFileRecordReader가 다음과 같이 사용될 수 있습니다.

    public class WholeFileRecordReader extends RecordReader<NullWritable, Text> {
        private static final Logger LOG = Logger.getLogger(WholeFileRecordReader.class);
    
          /** The path to the file to read. */
          private final Path mFileToRead;
          /** The length of this file. */
          private final long mFileLength;
    
          /** The Configuration. */
          private final Configuration mConf;
    
          /** Whether this FileSplit has been processed. */
          private boolean mProcessed;
          /** Single Text to store the file name of the current file. */
        //  private final Text mFileName;
          /** Single Text to store the value of this file (the value) when it is read. */
          private final Text mFileText;
    
          /**
           * Implementation detail: This constructor is built to be called via
           * reflection from within CombineFileRecordReader.
           *
           * @param fileSplit The CombineFileSplit that this will read from.
           * @param context The context for this task.
           * @param pathToProcess The path index from the CombineFileSplit to process in this record.
           */
          public WholeFileRecordReader(CombineFileSplit fileSplit, TaskAttemptContext context,
              Integer pathToProcess) {
            mProcessed = false;
            mFileToRead = fileSplit.getPath(pathToProcess);
            mFileLength = fileSplit.getLength(pathToProcess);
            mConf = context.getConfiguration();
    
            assert 0 == fileSplit.getOffset(pathToProcess);
            if (LOG.isDebugEnabled()) {
              LOG.debug("FileToRead is: " + mFileToRead.toString());
              LOG.debug("Processing path " + pathToProcess + " out of " + fileSplit.getNumPaths());
    
              try {
                FileSystem fs = FileSystem.get(mConf);
                assert fs.getFileStatus(mFileToRead).getLen() == mFileLength;
              } catch (IOException ioe) {
                // oh well, I was just testing.
              }
            }
    
        //    mFileName = new Text();
            mFileText = new Text();
          }
    
          /** {@inheritDoc} */
          @Override
          public void close() throws IOException {
            mFileText.clear();
          }
    
          /**
           * Returns the absolute path to the current file.
           *
           * @return The absolute path to the current file.
           * @throws IOException never.
           * @throws InterruptedException never.
           */
          @Override
          public NullWritable getCurrentKey() throws IOException, InterruptedException {
            return NullWritable.get();
          }
    
          /**
           * <p>Returns the current value.  If the file has been read with a call to NextKeyValue(),
           * this returns the contents of the file as a BytesWritable.  Otherwise, it returns an
           * empty BytesWritable.</p>
           *
           * <p>Throws an IllegalStateException if initialize() is not called first.</p>
           *
           * @return A BytesWritable containing the contents of the file to read.
           * @throws IOException never.
           * @throws InterruptedException never.
           */
          @Override
          public Text getCurrentValue() throws IOException, InterruptedException {
            return mFileText;
          }
    
          /**
           * Returns whether the file has been processed or not.  Since only one record
           * will be generated for a file, progress will be 0.0 if it has not been processed,
           * and 1.0 if it has.
           *
           * @return 0.0 if the file has not been processed.  1.0 if it has.
           * @throws IOException never.
           * @throws InterruptedException never.
           */
          @Override
          public float getProgress() throws IOException, InterruptedException {
            return (mProcessed) ? (float) 1.0 : (float) 0.0;
          }
    
          /**
           * All of the internal state is already set on instantiation.  This is a no-op.
           *
           * @param split The InputSplit to read.  Unused.
           * @param context The context for this task.  Unused.
           * @throws IOException never.
           * @throws InterruptedException never.
           */
          @Override
          public void initialize(InputSplit split, TaskAttemptContext context)
              throws IOException, InterruptedException {
            // no-op.
          }
    
          /**
           * <p>If the file has not already been read, this reads it into memory, so that a call
           * to getCurrentValue() will return the entire contents of this file as Text,
           * and getCurrentKey() will return the qualified path to this file as Text.  Then, returns
           * true.  If it has already been read, then returns false without updating any internal state.</p>
           *
           * @return Whether the file was read or not.
           * @throws IOException if there is an error reading the file.
           * @throws InterruptedException if there is an error.
           */
          @Override
          public boolean nextKeyValue() throws IOException, InterruptedException {
            if (!mProcessed) {
              if (mFileLength > (long) Integer.MAX_VALUE) {
                throw new IOException("File is longer than Integer.MAX_VALUE.");
              }
              byte[] contents = new byte[(int) mFileLength];
    
              FileSystem fs = mFileToRead.getFileSystem(mConf);
              FSDataInputStream in = null;
              try {
                // Set the contents of this file.
                in = fs.open(mFileToRead);
                IOUtils.readFully(in, contents, 0, contents.length);
                mFileText.set(contents, 0, contents.length);
    
              } finally {
                IOUtils.closeStream(in);
              }
              mProcessed = true;
              return true;
            }
            return false;
          }
    
    }
    

    다음은 귀하의 드라이버 코드입니다 : -

    public int run(String[] arg) throws Exception {
        Configuration conf=getConf();
        FileSystem fs = FileSystem.get(conf);
        //estimate reducers
        Job job = new Job(conf);
        job.setJarByClass(WholeFileDriver.class);
        job.setJobName("WholeFile");
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.setInputFormatClass(WholeFileInputFormat.class);
        job.setMapperClass(WholeFileMapper.class);
        job.setNumReduceTasks(0);
    
        FileInputFormat.addInputPath(job, new Path(arg[0]));
        Path output=new Path(arg[1]);
        try {
            fs.delete(output, true);
        } catch (IOException e) {
            LOG.warn("Failed to delete temporary path", e);
        }
        FileOutputFormat.setOutputPath(job, output);
    
        boolean ret=job.waitForCompletion(true);
        if(!ret){
            throw new Exception("Job Failed");
        }
    
  2. from https://stackoverflow.com/questions/17875277/reading-file-as-single-record-in-hadoop by cc-by-sa and MIT license