복붙노트

[HADOOP] Hadoop HDFS : 기록중인 시퀀스 파일 읽기

HADOOP

Hadoop HDFS : 기록중인 시퀀스 파일 읽기

Hadoop 1.0.3을 사용하고 있습니다.

HDFS에 Hadoop 시퀀스 파일에 로그를 쓰고, 각 로그 묶음 후에 syncFS ()를 호출하지만 파일을 닫지 않습니다 (매일 롤링을 수행 할 때를 제외하고).

내가 보증하기를 원하는 것은 파일이 여전히 쓰여지고있는 동안 독자가 ​​파일을 사용할 수 있다는 것입니다.

FSDataInputStream을 통해 시퀀스 파일의 바이트를 읽을 수 있지만 SequenceFile.Reader.next (key, val)를 사용하려고하면 첫 번째 호출에서 false를 반환합니다.

FSDataInputStream 또는 cat 명령으로 읽을 수 있기 때문에 데이터가 파일에 있다는 것을 알고 있으며 syncFS ()가 호출되었는지 100 % 확신합니다.

namenode 및 datanode 로그를 검사했는데 오류 또는 경고가 표시되지 않았습니다.

SequenceFile.Reader가 현재 작성한 파일을 읽을 수없는 이유는 무엇입니까?

해결법

  1. ==============================

    1.데이터 노드 측에서 읽기가 디스크에 완전히 기록되는지 확인할 수는 없습니다. DFSClient # DFSOutputStream.sync ()의 설명서에서이 내용을 볼 수 있습니다.

    데이터 노드 측에서 읽기가 디스크에 완전히 기록되는지 확인할 수는 없습니다. DFSClient # DFSOutputStream.sync ()의 설명서에서이 내용을 볼 수 있습니다.

      All data is written out to datanodes. It is not guaranteed that data has
      been flushed to persistent store on the datanode. Block allocations are
      persisted on namenode.
    

    따라서 기본적으로 namenode의 블록 맵을 현재 정보로 업데이트하고 데이터 노드로 데이터를 전송합니다. 데이터 노드에서 데이터를 디스크로 플러시 할 수는 없지만 데이터 노드에서 직접 읽으면 데이터가 버퍼링되고 액세스 할 수없는 시간대에 도달하게됩니다. 따라서 시퀀스 파일 판독기는 데이터 스트림이 완료 (또는 비어 있음)하고 비 직렬화 프로세스에 false를 반환하는 추가 바이트를 읽을 수 없다고 생각합니다.

    데이터 노드는 블록이 완전히 수신 된 경우 데이터를 디스크에 씁니다 (미리 작성되었지만 외부에서 읽을 수는 없습니다). 따라서 블록 크기에 도달했거나 파일을 미리 닫은 다음 블록을 완료하면 파일에서 읽을 수 있습니다. 작가가 죽고 제대로 블록을 완성 할 수 없기 때문에 분산 된 환경에서 완전히 이해할 수 있습니다. 이것은 일관성 문제입니다.

    따라서 블록 크기가 매우 작아서 블록이 더 자주 완료되도록 수정해야합니다. 그러나 그렇게 효율적이지는 않으며 귀하의 요구 사항이 HDFS에 적합하지 않다는 것이 분명해야합니다.

  2. ==============================

    2.SequenceFile.Reader가 기록되는 파일을 읽지 못하는 이유는 파일 길이를 사용하여 마법을 수행하기 때문입니다.

    SequenceFile.Reader가 기록되는 파일을 읽지 못하는 이유는 파일 길이를 사용하여 마법을 수행하기 때문입니다.

    첫 번째 블록이 쓰여지는 동안 파일 길이는 0으로 유지되고 블록이 가득 찼을 때만 업데이트됩니다 (기본적으로 64MB). 그런 다음 두 번째 블록이 완전히 기록 될 때까지 파일 크기가 64MB로 고정됩니다.

    즉, FSInputStream을 직접 사용하여 원시 데이터를 읽을 수있는 경우에도 SequenceFile.Reader를 사용하여 시퀀스 파일의 마지막 불완전한 블록을 읽을 수 없습니다.

    파일을 닫으면 파일 길이도 수정되지만, 제 경우에는 파일을 닫아야합니다.

  3. ==============================

    3.그래서 나는 같은 문제를 쳤다. 그리고 조사와 시간이 지나면 나는 다음과 같은 해결 방법을 찾았다.

    그래서 나는 같은 문제를 쳤다. 그리고 조사와 시간이 지나면 나는 다음과 같은 해결 방법을 찾았다.

    따라서 문제는 시퀀스 파일 생성의 내부 구현과 64MB 블록 단위로 업데이트되는 파일 길이를 사용하고 있기 때문입니다.

    그래서 독자를 생성하기 위해 다음 클래스를 생성하고 파일 길이를 리턴하는 get length 메소드를 대체하는 동안 hadoop FS를 내 자신의 것으로 랩핑했습니다.

    public class SequenceFileUtil {
    
        public SequenceFile.Reader createReader(Configuration conf, Path path) throws IOException {
    
            WrappedFileSystem fileSystem = new WrappedFileSystem(FileSystem.get(conf));
    
            return new SequenceFile.Reader(fileSystem, path, conf);
        }
    
        private class WrappedFileSystem extends FileSystem
        {
            private final FileSystem nestedFs;
    
            public WrappedFileSystem(FileSystem fs){
                this.nestedFs = fs;
            }
    
            @Override
            public URI getUri() {
                return nestedFs.getUri();
            }
    
            @Override
            public FSDataInputStream open(Path f, int bufferSize) throws IOException {
                return nestedFs.open(f,bufferSize);
            }
    
            @Override
            public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException {
                return nestedFs.create(f, permission,overwrite,bufferSize, replication, blockSize, progress);
            }
    
            @Override
            public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException {
                return nestedFs.append(f, bufferSize, progress);
            }
    
            @Override
            public boolean rename(Path src, Path dst) throws IOException {
                return nestedFs.rename(src, dst);
            }
    
            @Override
            public boolean delete(Path path) throws IOException {
                return nestedFs.delete(path);
            }
    
            @Override
            public boolean delete(Path f, boolean recursive) throws IOException {
                return nestedFs.delete(f, recursive);
            }
    
            @Override
            public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException {
                return nestedFs.listStatus(f);
            }
    
            @Override
            public void setWorkingDirectory(Path new_dir) {
                nestedFs.setWorkingDirectory(new_dir);
            }
    
            @Override
            public Path getWorkingDirectory() {
                return nestedFs.getWorkingDirectory();
            }
    
            @Override
            public boolean mkdirs(Path f, FsPermission permission) throws IOException {
                return nestedFs.mkdirs(f, permission);
            }
    
            @Override
            public FileStatus getFileStatus(Path f) throws IOException {
                return nestedFs.getFileStatus(f);
            }
    
    
            @Override
            public long getLength(Path f) throws IOException {
    
                DFSClient.DFSInputStream open =  new DFSClient(nestedFs.getConf()).open(f.toUri().getPath());
                long fileLength = open.getFileLength();
                long length = nestedFs.getLength(f);
    
                if (length < fileLength){
                    //We might have uncompleted blocks
                    return fileLength;
                }
    
                return length;
            }
    
    
        }
    }
    
  4. ==============================

    4.나는 비슷한 문제에 봉착했다. 어떻게 해결 했는가? http://mail-archives.apache.org/mod_mbox/hadoop-common-user/201303.mbox/%3CCALtSBbY+LX6fiKutGsybS5oLXxZbVuN0WvW_a5JbExY98hJfig@mail.gmail.com%3E

    나는 비슷한 문제에 봉착했다. 어떻게 해결 했는가? http://mail-archives.apache.org/mod_mbox/hadoop-common-user/201303.mbox/%3CCALtSBbY+LX6fiKutGsybS5oLXxZbVuN0WvW_a5JbExY98hJfig@mail.gmail.com%3E

  5. from https://stackoverflow.com/questions/14377657/hadoop-hdfs-read-sequence-files-that-are-being-written by cc-by-sa and MIT license