[HADOOP] hadoop의 MultipleOutputFormat
HADOOPhadoop의 MultipleOutputFormat
나는 하둡의 초보자입니다. 나는 워드 카운트 프로그램을 시험 중입니다.
이제 다중 출력 파일을 시험해보기 위해 MultipleOutputFormat을 사용합니다. 이 링크를 사용하면 도움이되었습니다. http://hadoop.apache.org/common/docs/r0.19.0/api/org/apache/hadoop/mapred/lib/MultipleOutputs.html
내 운전자 수업에서 나는
MultipleOutputs.addNamedOutput(conf, "even",
org.apache.hadoop.mapred.TextOutputFormat.class, Text.class,
IntWritable.class);
MultipleOutputs.addNamedOutput(conf, "odd",
org.apache.hadoop.mapred.TextOutputFormat.class, Text.class,
IntWritable.class);`
내 수업이 줄어들어
public static class Reduce extends MapReduceBase implements
Reducer<Text, IntWritable, Text, IntWritable> {
MultipleOutputs mos = null;
public void configure(JobConf job) {
mos = new MultipleOutputs(job);
}
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
if (sum % 2 == 0) {
mos.getCollector("even", reporter).collect(key, new IntWritable(sum));
}else {
mos.getCollector("odd", reporter).collect(key, new IntWritable(sum));
}
//output.collect(key, new IntWritable(sum));
}
@Override
public void close() throws IOException {
// TODO Auto-generated method stub
mos.close();
}
}
모든 것이 효과적 이었지만 많은 파일을 받았습니다 (모든지도 축소에 대해 하나의 홀수와 하나의 파일)
질문 : 어떻게 모든 출력물의 모든 홀수 출력이 홀수 파일에 쓰여지고 짝수 번째 출력물이 짝수 번째 출력물이되도록 홀수 & 짝수 출력 파일을 2 개만 가질 수 있습니까?
해결법
-
==============================
1.각 감속기는 OutputFormat을 사용하여 레코드를 기록합니다. 그래서 당신은 감속기 당 홀수 및 짝수 파일 세트를 얻는 것입니다. 이는 각 감속기가 병렬로 쓰기를 수행 할 수 있도록 설계된 것입니다.
각 감속기는 OutputFormat을 사용하여 레코드를 기록합니다. 그래서 당신은 감속기 당 홀수 및 짝수 파일 세트를 얻는 것입니다. 이는 각 감속기가 병렬로 쓰기를 수행 할 수 있도록 설계된 것입니다.
홀수와 홀수의 짝수 파일을 원한다면 mapred.reduce.tasks를 1로 설정해야합니다. 그러나 모든 맵퍼가 단일 감속기로 공급되기 때문에 성능이 저하됩니다.
또 다른 옵션은이 파일을 읽는 프로세스를 변경하여 여러 입력 파일을 허용하거나이 파일을 병합하는 별도의 프로세스를 작성하는 것입니다.
-
==============================
2.나는 이것을하기위한 수업을 썼다. 그냥 당신의 직업을 사용하십시오 :
나는 이것을하기위한 수업을 썼다. 그냥 당신의 직업을 사용하십시오 :
job.setOutputFormatClass(m_customOutputFormatClass);
이것은 내 수업입니다.
import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; /** * TextOutputFormat extension which enables writing the mapper/reducer's output in multiple files.<br> * <p> * <b>WARNING</b>: The number of different folder shuoldn't be large for one mapper since we keep an * {@link RecordWriter} instance per folder name. * </p> * <p> * In this class the folder name is defined by the written entry's key.<br> * To change this behavior simply extend this class and override the * {@link HdMultipleFileOutputFormat#getFolderNameExtractor()} method and create your own * {@link FolderNameExtractor} implementation. * </p> * * * @author ykesten * * @param <K> - Keys type * @param <V> - Values type */ public class HdMultipleFileOutputFormat<K, V> extends TextOutputFormat<K, V> { private String folderName; private class MultipleFilesRecordWriter extends RecordWriter<K, V> { private Map<String, RecordWriter<K, V>> fileNameToWriter; private FolderNameExtractor<K, V> fileNameExtractor; private TaskAttemptContext job; public MultipleFilesRecordWriter(FolderNameExtractor<K, V> fileNameExtractor, TaskAttemptContext job) { fileNameToWriter = new HashMap<String, RecordWriter<K, V>>(); this.fileNameExtractor = fileNameExtractor; this.job = job; } @Override public void write(K key, V value) throws IOException, InterruptedException { String fileName = fileNameExtractor.extractFolderName(key, value); RecordWriter<K, V> writer = fileNameToWriter.get(fileName); if (writer == null) { writer = createNewWriter(fileName, fileNameToWriter, job); if (writer == null) { throw new IOException("Unable to create writer for path: " + fileName); } } writer.write(key, value); } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { for (Entry<String, RecordWriter<K, V>> entry : fileNameToWriter.entrySet()) { entry.getValue().close(context); } } } private synchronized RecordWriter<K, V> createNewWriter(String folderName, Map<String, RecordWriter<K, V>> fileNameToWriter, TaskAttemptContext job) { try { this.folderName = folderName; RecordWriter<K, V> writer = super.getRecordWriter(job); this.folderName = null; fileNameToWriter.put(folderName, writer); return writer; } catch (Exception e) { e.printStackTrace(); return null; } } @Override public Path getDefaultWorkFile(TaskAttemptContext context, String extension) throws IOException { Path path = super.getDefaultWorkFile(context, extension); if (folderName != null) { String newPath = path.getParent().toString() + "/" + folderName + "/" + path.getName(); path = new Path(newPath); } return path; } @Override public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { return new MultipleFilesRecordWriter(getFolderNameExtractor(), job); } public FolderNameExtractor<K, V> getFolderNameExtractor() { return new KeyFolderNameExtractor<K, V>(); } public interface FolderNameExtractor<K, V> { public String extractFolderName(K key, V value); } private static class KeyFolderNameExtractor<K, V> implements FolderNameExtractor<K, V> { public String extractFolderName(K key, V value) { return key.toString(); } } }
-
==============================
3.여러 개의 출력 파일은 감속기의 수에 따라 생성됩니다.
여러 개의 출력 파일은 감속기의 수에 따라 생성됩니다.
hadoop dfs -getmerge를 사용하여 출력을 병합 할 수 있습니다.
from https://stackoverflow.com/questions/3491105/multipleoutputformat-in-hadoop by cc-by-sa and MIT license
'HADOOP' 카테고리의 다른 글
[HADOOP] 감속기에서 매퍼 카운터에 액세스 (0) | 2019.05.29 |
---|---|
[HADOOP] 값을 두 번 반복 (MapReduce) (0) | 2019.05.29 |
[HADOOP] 로컬 Hadoop 2.6 설치에서 S3 / S3n에 어떻게 액세스합니까? (0) | 2019.05.29 |
[HADOOP] .txt 파일을 Hadoop의 시퀀스 파일 형식으로 변환하는 방법 (0) | 2019.05.29 |
[HADOOP] 원사 개념 이해에 촉발 (0) | 2019.05.29 |