복붙노트

[HADOOP] 하둡 MapReduce를 함께 사용 WholeFileInputFormat 여전히 한번에 1 라인 매퍼 처리 결과

HADOOP

하둡 MapReduce를 함께 사용 WholeFileInputFormat 여전히 한번에 1 라인 매퍼 처리 결과

하둡 2.6을 사용하여 내 헤더 확장 .. 한 번에 한 줄 대신 내 매퍼에 전체 파일을 전송해야합니다. 나는 WholeFileInputFormat 및 WholeFileRecordReader을 만들기 위해 확실한 가이드 톰 백상 코드를 따랐지만 내 매퍼는 여전히 파일을 한 번에 한 라인을 처리 중입니다. 사람이 내 코드에서 누락 무엇을 볼 수 있을까요? 나는 정확히 내가 무엇을 볼 수에서 책 예제를 사용했다. 모든 지침은 많이 이해할 수있을 것이다.

WholeFileInputFormat.java

public class WholeFileInputFormat extends FileInputFormat <NullWritable, BytesWritable>{

@Override
protected boolean isSplitable(JobContext context, Path file){
    return false;
}

@Override
public RecordReader<NullWritable, BytesWritable> createRecordReader(
        InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
    WholeFileRecordReader reader = new WholeFileRecordReader();
    reader.initialize(split, context);
    return reader;
}

}

WholeFileRecordReader.java

public class WholeFileRecordReader extends RecordReader<NullWritable, BytesWritable> {
private FileSplit fileSplit;
private Configuration conf;
private BytesWritable value = new BytesWritable();
private boolean processed = false;

@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException{
    this.fileSplit = (FileSplit) split;
    this.conf = context.getConfiguration();
}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException{
    if (!processed){
        byte[] contents = new byte[(int) fileSplit.getLength()];
        Path file = fileSplit.getPath();
        FileSystem fs = file.getFileSystem(conf);
        FSDataInputStream in = null;
        try{
            in = fs.open(file);
            IOUtils.readFully(in, contents, 0, contents.length);
            value.set(contents, 0, contents.length);
        }finally{
            IOUtils.closeStream(in);
        }
        processed = true;
        return  true;
    }
    return false;
}

@Override
public NullWritable getCurrentKey() throws IOException, InterruptedException{
    return NullWritable.get();
}

@Override
public BytesWritable getCurrentValue() throws IOException, InterruptedException{
    return value;
}

@Override
public float getProgress() throws IOException {
    return processed ? 1.0f : 0.0f;
}

@Override
public void close() throws IOException{
    //do nothing :)
}

}

그리고 내 맵리 듀스를위한 주요 방법

public class ECCCount {
public static void main(String[] args) throws Exception {

    if (args.length != 2) {
      System.out.printf("Usage: ProcessLogs <input dir> <output dir>\n");
      System.exit(-1);
    }

    //@SuppressWarnings("deprecation")
    Job job = new Job();
    job.setJarByClass(ECCCount.class);
    job.setJobName("ECCCount");

    //FileInputFormat.setInputPaths(job, new Path(args[0]));
    WholeFileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    job.setMapperClass(ECCCountMapper.class);
    job.setReducerClass(SumReducer.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    boolean success = job.waitForCompletion(true);
    System.exit(success ? 0 : 1);
  }

}

좋은 측정을 위해 그리고 내 매퍼. 그것의 선 또는 전체 파일을 반환하는 경우 지금은 단순히 볼 수있는 테스트 케이스로 주어진 값의를 반환

public class ECCCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
@Override
  public void map(LongWritable key, Text value, Context context)
      throws IOException, InterruptedException {

      context.write(new Text(value), new IntWritable(1));
  }

}

해결법

  1. ==============================

    1.문제는 매퍼의 입력 형식이 될 수 있습니다. 당신은 LongWritable 및 텍스트가 있습니다. 그는 WholeFileInputFormat는 데 무엇 때문에 언급 한 예에서, 그들은 NullWritable, BytesWritable을 사용했다. 또한, job.setInputFormatClass (WholeFileInputFormat.class)을 줄 필요가있다; 에서 작업 클래스 (main 메소드). 희망이 도움과 행복 코딩

    문제는 매퍼의 입력 형식이 될 수 있습니다. 당신은 LongWritable 및 텍스트가 있습니다. 그는 WholeFileInputFormat는 데 무엇 때문에 언급 한 예에서, 그들은 NullWritable, BytesWritable을 사용했다. 또한, job.setInputFormatClass (WholeFileInputFormat.class)을 줄 필요가있다; 에서 작업 클래스 (main 메소드). 희망이 도움과 행복 코딩

  2. ==============================

    2.램지의 입력 덕분에 내 오류를 발견하고 다음과 같이 변경과 함께 전달 된 전체 파일을 얻을 수 있었다

    램지의 입력 덕분에 내 오류를 발견하고 다음과 같이 변경과 함께 전달 된 전체 파일을 얻을 수 있었다

    내 주요 방법은 내가 사용하는 데 필요한 InputFormatClass을 지정했습니다.

    job.setInputFormatClass(WholeFileInputFormat.class)
    

    내 매퍼 입력으로 올바른 유형을 기대해야

    public class ECCCountMapper extends Mapper<NullWritable, BytesWritable, Text, IntWritable>{
    

    그 두 가지 변화가 성공적으로 필요에 따라 그것을 조작하는 내 매퍼에 전체 파일의 바이트 []를 보냈다.

  3. from https://stackoverflow.com/questions/30649590/using-wholefileinputformat-with-hadoop-mapreduce-still-results-in-mapper-process by cc-by-sa and MIT license