복붙노트

[HADOOP] 파일 이름 + 오프셋을 포함하도록 SequenceFileInputFormat 확장

HADOOP

파일 이름 + 오프셋을 포함하도록 SequenceFileInputFormat 확장

시퀀스 파일을 읽는 추가 사용자 지정 InputFormat을 만들 수 있지만 레코드가있는 해당 파일 내에서 파일 경로와 오프셋을 노출하고 싶습니다.

한 걸음 물러나려면 여기에 유스 케이스가 있습니다. 가변 크기의 데이터가 포함 된 시퀀스 파일이 있습니다. 키는 대부분 관련성이 없으며 값은 다양한 필드가 포함 된 최대 2 메가 바이트입니다. 나는 파일 이름과 오프셋과 함께 elasticsearch에서 이러한 필드의 일부를 색인하고 싶다. 이렇게하면 elasticsearch에서 해당 필드를 쿼리 한 다음 전체 파일을 ES에 저장하는 대신 시퀀스 파일로 돌아가서 원본 레코드를 가져 오기 위해 파일 이름과 오프셋을 사용할 수 있습니다.

나는이 모든 과정을 단일 자바 프로그램으로 작업한다. SequenceFile.Reader 클래스는 편리하게 getPosition과 seek 메소드를 제공하여이를 가능하게합니다.

그러나 궁극적으로 수 테라 바이트의 데이터가 포함되므로이 데이터를 MapReduce 작업 (아마도 맵 전용)으로 변환해야 할 것입니다. 시퀀스 파일의 실제 키가 관련이 없기 때문에 내가 취하고 자하는 접근 방식은 SquenceFileInputFormat을 확장하거나 어떻게 사용하는지 사용자 정의 InputFormat을 만드는 것이지만 실제 키를 반환하는 대신 파일로 구성된 복합 키를 반환합니다 및 오프셋.

그러나 실제로는 더 어려워지고 있습니다. 그것은 가능해야하지만, 실제 API와 무엇이 노출되었는지에 따라 까다로운 일입니다. 어떤 아이디어? 어쩌면 내가 취해야 할 대체 접근법일까요?

해결법

  1. ==============================

    1.누구나 비슷한 문제가 발생하면 여기 나와있는 해결책이 있습니다. SequenceFileInputFormat / RecordReader에있는 코드 중 일부를 복제하고 수정하는 것으로 끝났습니다. 서브 클래스 나 데코레이터 또는 뭔가를 쓰려고했는데 ...이 방법은별로 좋지는 않지만 작동합니다.

    누구나 비슷한 문제가 발생하면 여기 나와있는 해결책이 있습니다. SequenceFileInputFormat / RecordReader에있는 코드 중 일부를 복제하고 수정하는 것으로 끝났습니다. 서브 클래스 나 데코레이터 또는 뭔가를 쓰려고했는데 ...이 방법은별로 좋지는 않지만 작동합니다.

    SequenceFileOffsetInputFormat.java:

    import java.io.IOException;
    import java.util.List;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.SequenceFile;
    import org.apache.hadoop.io.Writable;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.JobContext;
    import org.apache.hadoop.mapreduce.RecordReader;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
    
    public class SequenceFileOffsetInputFormat<V extends Writable> extends FileInputFormat<PathOffsetWritable, V> {
    
        private static class SequenceFileOffsetRecordReader<V extends Writable> extends RecordReader<PathOffsetWritable, V> {
    
            private SequenceFile.Reader in;
            private long start;
            private long end;
            private boolean more = true;
            private PathOffsetWritable key = null;
            private Writable k = null;
            private V value = null;
            private Configuration conf;
    
            @Override
            public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
                FileSplit fileSplit = (FileSplit) split;
                conf = context.getConfiguration();
                Path path = fileSplit.getPath();
                FileSystem fs = path.getFileSystem(conf);
                this.in = new SequenceFile.Reader(fs, path, conf);
                try {
                    this.k = (Writable) in.getKeyClass().newInstance();
                    this.value = (V) in.getValueClass().newInstance();
                } catch (InstantiationException e) {
                    throw new IOException(e);
                } catch (IllegalAccessException e) {
                    throw new IOException(e);
                }
                this.end = fileSplit.getStart() + fileSplit.getLength();
    
                if (fileSplit.getStart() > in.getPosition()) {
                    in.sync(fileSplit.getStart());
                }
    
                this.start = in.getPosition();
                more = start < end;
    
                key = new PathOffsetWritable(path, start);
            }
    
            @Override
            public boolean nextKeyValue() throws IOException, InterruptedException {
                if (!more) {
                    return false;
                }
                long pos = in.getPosition();
    
                more = in.next(k, value);
                if (!more || (pos >= end && in.syncSeen())) {
                    key = null;
                    value = null;
                    more = false;
                } else {
                    key.setOffset(pos);
                }
                return more;
            }
    
            @Override
            public PathOffsetWritable getCurrentKey() {
                return key;
            }
    
            @Override
            public V getCurrentValue() {
                return value;
            }
    
            @Override
            public float getProgress() throws IOException, InterruptedException {
                if (end == start) {
                    return 0.0f;
                } else {
                    return Math.min(1.0f, (in.getPosition() - start) / (float)(end - start));
                }
            }
    
            @Override
            public void close() throws IOException {
                in.close();
            }
    
        }
    
        @Override
        public RecordReader<PathOffsetWritable, V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
            return new SequenceFileOffsetRecordReader<V>();
        }
    
        @Override
        public List<InputSplit> getSplits(JobContext context) throws IOException {
            return new SequenceFileInputFormat<PathOffsetWritable, V>().getSplits(context);
        }
    
        @Override
        public long getFormatMinSplitSize() {
            return SequenceFile.SYNC_INTERVAL;
        }
    
    
    }
    

    PathOffsetWritable.java:

    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.WritableComparable;
    
    public class PathOffsetWritable implements WritableComparable<PathOffsetWritable> {
    
        private Text t = new Text();
        private Path path;
        private long offset;
    
        public PathOffsetWritable(Path path, long offset) {
            this.path = path;
            this.offset = offset;
        }
    
        public Path getPath() {
            return path;
        }
    
        public long getOffset() {
            return offset;
        }
    
        public void setPath(Path path) {
            this.path = path;
        }
    
        public void setOffset(long offset) {
            this.offset = offset;
        }
    
        @Override
        public void readFields(DataInput in) throws IOException {
            t.readFields(in);
            path = new Path(t.toString());
            offset = in.readLong();
        }
    
        @Override
        public void write(DataOutput out) throws IOException {
            t.set(path.toString());
            t.write(out);
            out.writeLong(offset);
        }
    
        @Override
        public int compareTo(PathOffsetWritable o) {
            int x = path.compareTo(o.path);
            if (x != 0) {
                return x;
            } else {
                return Long.valueOf(offset).compareTo(Long.valueOf(o.offset));
            }
        }
    
    
    }
    
  2. from https://stackoverflow.com/questions/18642875/extend-sequencefileinputformat-to-include-file-nameoffset by cc-by-sa and MIT license