복붙노트

[HADOOP] SequenceFile이 잘리는 이유는 무엇입니까?

HADOOP

SequenceFile이 잘리는 이유는 무엇입니까?

하둡을 배우고 있는데이 문제로 잠시 당황했습니다. 기본적으로 SequenceFile을 디스크에 쓰고 다시 읽습니다. 그러나 읽을 때 EOFException이 발생할 때마다. 자세히 살펴보면 시퀀스 파일을 쓸 때 시퀀스 파일이 조기에 잘리고 색인 962를 쓴 후에 항상 발생하며 파일의 크기는 항상 45056 바이트입니다.

MacBook Pro에서 Java 8 및 Hadoop 2.5.1을 사용하고 있습니다. 실제로 Java 7의 다른 Linux 시스템에서 동일한 코드를 시도했지만 동일한 일이 발생합니다.

라이터 / 리더가 제대로 닫히지 않았을 수 있습니다. 코드에 표시된 것처럼 명시 적 writer.close ()와 함께 이전 스타일의 try / catch를 사용해 보았으며 새로운 try-with-resource 접근법을 사용했습니다. 둘 다 작동하지 않습니다.

도움을 주시면 감사하겠습니다.

다음은 내가 사용하는 코드입니다.

public class SequenceFileDemo {

private static final String[] DATA = { "One, two, buckle my shoe",
    "Three, four, shut the door",
    "Five, six, pick up sticks",
    "Seven, eight, lay them straight",
    "Nine, ten, a big fat hen" };

public static void main(String[] args) throws Exception {
    String uri = "file:///Users/andy/Downloads/puzzling.seq";
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(URI.create(uri), conf);

    Path path = new Path(uri);      
    IntWritable key = new IntWritable();
    Text value = new Text();

    //API change
    try {
        SequenceFile.Writer writer = SequenceFile.createWriter(conf, 
            stream(fs.create(path)),
            keyClass(IntWritable.class),
            valueClass(Text.class));

        for ( int i = 0; i < 1024; i++ ) {
            key.set( i);
            value.clear();
            value.set(DATA[i % DATA.length]);

            writer.append(key, value);
            if ( (i-1) %100 == 0 ) writer.hflush();
            System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key, value);
        }

        writer.close();

    } catch (Exception e ) {
        e.printStackTrace();
    }


    try {
        SequenceFile.Reader reader = new SequenceFile.Reader(conf, 
                SequenceFile.Reader.file(path));
        Class<?> keyClass = reader.getKeyClass();
        Class<?> valueClass = reader.getValueClass();

        boolean isWritableSerilization = false;
        try {
            keyClass.asSubclass(WritableComparable.class);
            isWritableSerilization = true;
        } catch (ClassCastException e) {

        }

        if ( isWritableSerilization ) {
            WritableComparable<?> rKey = (WritableComparable<?>) ReflectionUtils.newInstance(keyClass, conf);
            Writable rValue = (Writable) ReflectionUtils.newInstance(valueClass, conf);
            while(reader.next(rKey, rValue)) {
                System.out.printf("[%s] %d %s=%s\n",reader.syncSeen(), reader.getPosition(), rKey, rValue);
            }
        } else {
            //make sure io.seraizliatons has the serialization in use when write the sequence file
        }

        reader.close();
    } catch(IOException e) {
        e.printStackTrace();
    }
}

}

해결법

  1. ==============================

    1.실제로 오류를 발견했습니다. Writer.stream (fs.create (path))에서 작성된 스트림을 닫지 않기 때문입니다.

    실제로 오류를 발견했습니다. Writer.stream (fs.create (path))에서 작성된 스트림을 닫지 않기 때문입니다.

    어떤 이유로 닫기는 방금 만든 스트림으로 전파되지 않습니다. 이것은 내가 생각하는 버그이지만 지금 Jira에서 찾아보기에는 너무 게으르다.

    문제를 해결하는 한 가지 방법은 Writer.file (path)을 대신 사용하는 것입니다.

    분명히 create 스트림을 명시 적으로 닫을 수도 있습니다. 아래에서 올바른 예를 찾으십시오.

        Path path = new Path("file:///tmp/puzzling.seq");
    
        try (FSDataOutputStream stream = fs.create(path)) {
            try (SequenceFile.Writer writer = SequenceFile.createWriter(conf, Writer.stream(stream),
                    Writer.keyClass(IntWritable.class), Writer.valueClass(NullWritable.class))) {
    
                for (int i = 0; i < 1024; i++) {
                    writer.append(new IntWritable(i), NullWritable.get());
                }
            }
        }
    
        try (SequenceFile.Reader reader = new SequenceFile.Reader(conf, Reader.file(path))) {
            Class<?> keyClass = reader.getKeyClass();
            Class<?> valueClass = reader.getValueClass();
    
            WritableComparable<?> rKey = (WritableComparable<?>) ReflectionUtils.newInstance(keyClass, conf);
            Writable rValue = (Writable) ReflectionUtils.newInstance(valueClass, conf);
            while (reader.next(rKey, rValue)) {
                System.out.printf("%s = %s\n", rKey, rValue);
            }
    
        }
    
  2. ==============================

    2.쓰기 루프 후 writer.close ()가 누락 된 것 같습니다. 그러면 읽기를 시작하기 전에 최종 플러시가 보장됩니다.

    쓰기 루프 후 writer.close ()가 누락 된 것 같습니다. 그러면 읽기를 시작하기 전에 최종 플러시가 보장됩니다.

  3. ==============================

    3.토마스에게 감사합니다.

    토마스에게 감사합니다.

    작가가 그렇지 않은 스트림을 "소유"한 경우로 요약됩니다. 라이터를 작성할 때 Writer.file (path) 옵션을 전달하면 라이터는 내부적으로 작성된 기본 스트림을 "소유"하고 close ()가 호출 될 때이를 닫습니다. 그러나 우리가 Writer.stream (aStream)을 전달하면, 작성자는 다른 사람이 해당 스트림에 대한 응답이라고 가정하고 close ()가 호출 될 때 닫히지 않습니다. 간단히 말해서, 그것은 버그가 아니며, 나는 그것을 충분히 이해하지 못한다는 것입니다. .

  4. from https://stackoverflow.com/questions/27916872/why-the-sequencefile-is-truncated by cc-by-sa and MIT license