복붙노트

[HADOOP] 파일 형식이 사용자 정의 형식 인 경우 Hadoop MultipleOutputs가 여러 파일에 쓰지 않습니다

HADOOP

파일 형식이 사용자 정의 형식 인 경우 Hadoop MultipleOutputs가 여러 파일에 쓰지 않습니다

cassandra에서 읽고 MultipleOutputs api (Hadoop 버전 1.0.3)를 사용하여 감속기 출력을 여러 출력 파일에 쓰려고합니다. 필자의 경우 파일 형식은 FileOutputFormat을 확장하는 사용자 정의 출력 형식입니다. MultipleOutputs api에 표시된 것과 비슷한 방식으로 작업을 구성했습니다. 그러나 작업을 실행할 때 텍스트 출력 형식의 part-r-0000이라는 출력 파일 하나만 얻습니다. job.setOutputFormatClass ()가 설정되지 않은 경우 기본적으로 TextOutputFormat을 형식으로 간주합니다. 또한 두 형식 클래스 중 하나만 초기화 할 수 있습니다. MulitpleOutputs.addNamedOutput (job, "format1", MyCustomFileFormat1.class, Text.class, Text.class) 및 MulitpleOutputs.addNamedOutput (job, "format2", MyCustomFileFormat2.class, Text.class,에서 지정한 출력 형식을 완전히 무시합니다. Text.class). 다른 사람이 비슷한 문제에 직면하고 있습니까? 아니면 내가 잘못하고 있습니까?

또한 텍스트 파일에서 읽고 MultipleOutputs api에 표시된 것처럼 출력을 2 가지 형식의 TextOutputFormat 및 SequenceFileOutputFormat으로 쓰는 매우 간단한 MR 프로그램을 작성하려고했습니다. 그러나 행운도 없습니다. 텍스트 출력 형식으로 하나의 출력 파일 만 얻습니다.

누군가 나를 도울 수 있습니까?

Job job = new Job(getConf(), "cfdefGen");
job.setJarByClass(CfdefGeneration.class);

//read input from cassandra column family
ConfigHelper.setInputColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY);
job.setInputFormatClass(ColumnFamilyInputFormat.class);
job.getConfiguration().set("cassandra.consistencylevel.read", "QUORUM");

//thrift input job configurations
ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160");
ConfigHelper.setInputInitialAddress(job.getConfiguration(), HOST);
ConfigHelper.setInputPartitioner(job.getConfiguration(), "RandomPartitioner");

SlicePredicate predicate = new SlicePredicate().setColumn_names(Arrays.asList(ByteBufferUtil.bytes("classification")));
//ConfigHelper.setRangeBatchSize(job.getConfiguration(), 2048);
ConfigHelper.setInputSlicePredicate(job.getConfiguration(), predicate);

//specification for mapper
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

//specifications for reducer (writing to files)
job.setReducerClass(ReducerToFileSystem.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//job.setOutputFormatClass(MyCdbWriter1.class);
job.setNumReduceTasks(1);

//set output path for storing output files
Path filePath = new Path(OUTPUT_DIR);
FileSystem hdfs = FileSystem.get(getConf());
if(hdfs.exists(filePath)){
    hdfs.delete(filePath, true);
}
MyCdbWriter1.setOutputPath(job, new Path(OUTPUT_DIR));

MultipleOutputs.addNamedOutput(job, "cdb1', MyCdbWriter1.class, Text.class, Text.class);
MultipleOutputs.addNamedOutput(job, "cdb2", MyCdbWriter2.class, Text.class, Text.class);

boolean success = job.waitForCompletion(true);
return success ? 0:1;

public static class ReducerToFileSystem extends Reducer<Text, Text, Text, Text>
{
    private MultipleOutputs<Text, Text> mos;

    public void setup(Context context){
        mos = new MultipleOutputs<Text, Text>(context);
    }

    //public void reduce(Text key, Text value, Context context) 
    //throws IOException, InterruptedException (This was the mistake, changed the signature and it worked fine)
    public void reduce(Text key, Iterable<Text> values, Context context)
    throws IOException, InterruptedException
    {
        //context.write(key, value);
        mos.write("cdb1", key, value, OUTPUT_DIR+"/"+"cdb1");
        mos.write("cdb2", key, value, OUTPUT_DIR+"/"+"cdb2");
        context.progress();
    }

    public void cleanup(Context context) throws IOException, InterruptedException {
        mos.close();
    }
}

public class MyCdbWriter1<K, V> extends FileOutputFormat<K, V> 
{
    @Override
    public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException 
    {
    }

    public static void setOutputPath(Job job, Path outputDir) {
        job.getConfiguration().set("mapred.output.dir", outputDir.toString());
    }

    protected static class CdbDataRecord<K, V> extends RecordWriter<K, V>
    {
        @override
        write()
        close()
    }
}

해결법

  1. ==============================

    1.reduce 메소드가 호출되지 않는다는 디버깅 후 실수를 발견했습니다. 내 함수 정의가 API의 정의와 일치하지 않아 public void reduce (텍스트 키, 텍스트 값, 컨텍스트 컨텍스트)에서 public void reduce (텍스트 키, Iterable 값, 컨텍스트 컨텍스트)로 변경되었습니다. reduce 메소드에 @Override 태그가없는 이유를 모르겠습니다. 실수를 방지했을 것입니다.

    reduce 메소드가 호출되지 않는다는 디버깅 후 실수를 발견했습니다. 내 함수 정의가 API의 정의와 일치하지 않아 public void reduce (텍스트 키, 텍스트 값, 컨텍스트 컨텍스트)에서 public void reduce (텍스트 키, Iterable 값, 컨텍스트 컨텍스트)로 변경되었습니다. reduce 메소드에 @Override 태그가없는 이유를 모르겠습니다. 실수를 방지했을 것입니다.

  2. ==============================

    2.또한 비슷한 문제가 발생했습니다.지도 프로세스에서 모든 레코드를 필터링하여 Reduce에 전달되지 않는 것으로 나타났습니다. Reduce 작업에서 이름이 지정되지 않은 다중 출력을 사용하면 여전히 _SUCCESS 파일과 빈 part-r-00000 파일이 생성됩니다.

    또한 비슷한 문제가 발생했습니다.지도 프로세스에서 모든 레코드를 필터링하여 Reduce에 전달되지 않는 것으로 나타났습니다. Reduce 작업에서 이름이 지정되지 않은 다중 출력을 사용하면 여전히 _SUCCESS 파일과 빈 part-r-00000 파일이 생성됩니다.

  3. from https://stackoverflow.com/questions/12981233/hadoop-multipleoutputs-does-not-write-to-multiple-files-when-file-formats-are-cu by cc-by-sa and MIT license