복붙노트

[HADOOP] hadoop의 MultipleOutputFormat

HADOOP

hadoop의 MultipleOutputFormat

나는 하둡의 초보자입니다. 나는 워드 카운트 프로그램을 시험 중입니다.

이제 다중 출력 파일을 시험해보기 위해 MultipleOutputFormat을 사용합니다. 이 링크를 사용하면 도움이되었습니다. http://hadoop.apache.org/common/docs/r0.19.0/api/org/apache/hadoop/mapred/lib/MultipleOutputs.html

내 운전자 수업에서 나는

    MultipleOutputs.addNamedOutput(conf, "even",
            org.apache.hadoop.mapred.TextOutputFormat.class, Text.class,
            IntWritable.class);

    MultipleOutputs.addNamedOutput(conf, "odd",
            org.apache.hadoop.mapred.TextOutputFormat.class, Text.class,
            IntWritable.class);`

내 수업이 줄어들어

public static class Reduce extends MapReduceBase implements
        Reducer<Text, IntWritable, Text, IntWritable> {
    MultipleOutputs mos = null;

    public void configure(JobConf job) {
        mos = new MultipleOutputs(job);
    }

    public void reduce(Text key, Iterator<IntWritable> values,
            OutputCollector<Text, IntWritable> output, Reporter reporter)
            throws IOException {
        int sum = 0;
        while (values.hasNext()) {
            sum += values.next().get();
        }
        if (sum % 2 == 0) {
            mos.getCollector("even", reporter).collect(key, new IntWritable(sum));
        }else {
            mos.getCollector("odd", reporter).collect(key, new IntWritable(sum));
        }
        //output.collect(key, new IntWritable(sum));
    }
    @Override
    public void close() throws IOException {
        // TODO Auto-generated method stub
    mos.close();
    }
}

모든 것이 효과적 이었지만 많은 파일을 받았습니다 (모든지도 축소에 대해 하나의 홀수와 하나의 파일)

질문 : 어떻게 모든 출력물의 모든 홀수 출력이 홀수 파일에 쓰여지고 짝수 번째 출력물이 짝수 번째 출력물이되도록 홀수 & 짝수 출력 파일을 2 개만 가질 수 있습니까?

해결법

  1. ==============================

    1.각 감속기는 OutputFormat을 사용하여 레코드를 기록합니다. 그래서 당신은 감속기 당 홀수 및 짝수 파일 세트를 얻는 것입니다. 이는 각 감속기가 병렬로 쓰기를 수행 할 수 있도록 설계된 것입니다.

    각 감속기는 OutputFormat을 사용하여 레코드를 기록합니다. 그래서 당신은 감속기 당 홀수 및 짝수 파일 세트를 얻는 것입니다. 이는 각 감속기가 병렬로 쓰기를 수행 할 수 있도록 설계된 것입니다.

    홀수와 홀수의 짝수 파일을 원한다면 mapred.reduce.tasks를 1로 설정해야합니다. 그러나 모든 맵퍼가 단일 감속기로 공급되기 때문에 성능이 저하됩니다.

    또 다른 옵션은이 파일을 읽는 프로세스를 변경하여 여러 입력 파일을 허용하거나이 파일을 병합하는 별도의 프로세스를 작성하는 것입니다.

  2. ==============================

    2.나는 이것을하기위한 수업을 썼다. 그냥 당신의 직업을 사용하십시오 :

    나는 이것을하기위한 수업을 썼다. 그냥 당신의 직업을 사용하십시오 :

    job.setOutputFormatClass(m_customOutputFormatClass);
    

    이것은 내 수업입니다.

    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Map.Entry;
    
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.mapreduce.RecordWriter;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    
    /**
     * TextOutputFormat extension which enables writing the mapper/reducer's output in multiple files.<br>
     * <p>
     * <b>WARNING</b>: The number of different folder shuoldn't be large for one mapper since we keep an
     * {@link RecordWriter} instance per folder name.
     * </p>
     * <p>
     * In this class the folder name is defined by the written entry's key.<br>
     * To change this behavior simply extend this class and override the
     * {@link HdMultipleFileOutputFormat#getFolderNameExtractor()} method and create your own
     * {@link FolderNameExtractor} implementation.
     * </p>
     * 
     * 
     * @author ykesten
     * 
     * @param <K> - Keys type
     * @param <V> - Values type
     */
    public class HdMultipleFileOutputFormat<K, V> extends TextOutputFormat<K, V> {
    
        private String folderName;
    
        private class MultipleFilesRecordWriter extends RecordWriter<K, V> {
    
            private Map<String, RecordWriter<K, V>> fileNameToWriter;
            private FolderNameExtractor<K, V> fileNameExtractor;
            private TaskAttemptContext job;
    
            public MultipleFilesRecordWriter(FolderNameExtractor<K, V> fileNameExtractor, TaskAttemptContext job) {
                fileNameToWriter = new HashMap<String, RecordWriter<K, V>>();
                this.fileNameExtractor = fileNameExtractor;
                this.job = job;
            }
    
            @Override
            public void write(K key, V value) throws IOException, InterruptedException {
                String fileName = fileNameExtractor.extractFolderName(key, value);
                RecordWriter<K, V> writer = fileNameToWriter.get(fileName);
                if (writer == null) {
                    writer = createNewWriter(fileName, fileNameToWriter, job);
                    if (writer == null) {
                        throw new IOException("Unable to create writer for path: " + fileName);
                    }
                }
                writer.write(key, value);
            }
    
            @Override
            public void close(TaskAttemptContext context) throws IOException, InterruptedException {
                for (Entry<String, RecordWriter<K, V>> entry : fileNameToWriter.entrySet()) {
                    entry.getValue().close(context);
                }
            }
    
        }
    
        private synchronized RecordWriter<K, V> createNewWriter(String folderName,
                Map<String, RecordWriter<K, V>> fileNameToWriter, TaskAttemptContext job) {
            try {
                this.folderName = folderName;
                RecordWriter<K, V> writer = super.getRecordWriter(job);
                this.folderName = null;
                fileNameToWriter.put(folderName, writer);
                return writer;
            } catch (Exception e) {
                e.printStackTrace();
                return null;
            }
        }
    
        @Override
        public Path getDefaultWorkFile(TaskAttemptContext context, String extension) throws IOException {
            Path path = super.getDefaultWorkFile(context, extension);
            if (folderName != null) {
                String newPath = path.getParent().toString() + "/" + folderName + "/" + path.getName();
                path = new Path(newPath);
            }
            return path;
        }
    
        @Override
        public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
            return new MultipleFilesRecordWriter(getFolderNameExtractor(), job);
        }
    
        public FolderNameExtractor<K, V> getFolderNameExtractor() {
            return new KeyFolderNameExtractor<K, V>();
        }
    
        public interface FolderNameExtractor<K, V> {
            public String extractFolderName(K key, V value);
        }
    
        private static class KeyFolderNameExtractor<K, V> implements FolderNameExtractor<K, V> {
            public String extractFolderName(K key, V value) {
                return key.toString();
            }
        }
    
    }
    
  3. ==============================

    3.여러 개의 출력 파일은 감속기의 수에 따라 생성됩니다.

    여러 개의 출력 파일은 감속기의 수에 따라 생성됩니다.

    hadoop dfs -getmerge를 사용하여 출력을 병합 할 수 있습니다.

  4. from https://stackoverflow.com/questions/3491105/multipleoutputformat-in-hadoop by cc-by-sa and MIT license