복붙노트

[HADOOP] 내 시퀀스 파일이 내 하프 매퍼 클래스에서 두 번 읽히는 이유는 무엇입니까?

HADOOP

내 시퀀스 파일이 내 하프 매퍼 클래스에서 두 번 읽히는 이유는 무엇입니까?

1264 레코드가있는 SequenceFile 있습니다. 각 키는 각 레코드마다 고유합니다. 내 문제는 내 매퍼가이 파일을 두 번 읽는 것으로 보이거나 두 번 읽히는 것입니다. 온전한 검사를 위해 SequenceFile을 읽으려면 약간의 유틸리티 클래스를 작성했지만 실제로는 1264 개의 레코드 (예 : SequenceFile.Reader) 만 있습니다.

필자는 감속기에서 Iterable 당 1 레코드 만 가져야합니다. 그러나 iterable (반복기)을 반복 할 때마다 키 당 2 개의 레코드가 생성됩니다 (키당 2 개, 키 당 1 또는 3 또는 다른 것이 아닙니다).

내 작업의 로깅 결과는 다음과 같습니다. 나는 왜 그런지 모르지만 왜 "프로세스에 대한 총 입력 경로"가 2인가? 내 Job을 실행할 때, -Dmapred.input.dir = / data와 -Dmapred.input.dir = / data / part-r-00000을 시도했지만 여전히 처리 할 전체 경로는 2입니다.

어떤 아이디어라도 감사합니다.

12/03/01 05:28:30 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
12/03/01 05:28:30 INFO input.FileInputFormat: Total input paths to process : 2
12/03/01 05:28:31 INFO mapred.JobClient: Running job: job_local_0001
12/03/01 05:28:31 INFO input.FileInputFormat: Total input paths to process : 2
12/03/01 05:28:31 INFO mapred.MapTask: io.sort.mb = 100
12/03/01 05:28:31 INFO mapred.MapTask: data buffer = 79691776/99614720
12/03/01 05:28:31 INFO mapred.MapTask: record buffer = 262144/327680
12/03/01 05:28:31 INFO mapred.MapTask: Starting flush of map output
12/03/01 05:28:31 INFO mapred.MapTask: Finished spill 0
12/03/01 05:28:31 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
12/03/01 05:28:31 INFO mapred.LocalJobRunner:
12/03/01 05:28:31 INFO mapred.TaskRunner: Task 'attempt_local_0001_m_000000_0' done.
12/03/01 05:28:31 INFO mapred.MapTask: io.sort.mb = 100
12/03/01 05:28:31 INFO mapred.MapTask: data buffer = 79691776/99614720
12/03/01 05:28:31 INFO mapred.MapTask: record buffer = 262144/327680
12/03/01 05:28:31 INFO mapred.MapTask: Starting flush of map output
12/03/01 05:28:31 INFO mapred.MapTask: Finished spill 0
12/03/01 05:28:31 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000001_0 is done. And is in the process of commiting
12/03/01 05:28:31 INFO mapred.LocalJobRunner:
12/03/01 05:28:31 INFO mapred.TaskRunner: Task 'attempt_local_0001_m_000001_0' done.
12/03/01 05:28:31 INFO mapred.LocalJobRunner:
12/03/01 05:28:31 INFO mapred.Merger: Merging 2 sorted segments
12/03/01 05:28:31 INFO mapred.Merger: Down to the last merge-pass, with 2 segments left of total size: 307310 bytes
12/03/01 05:28:31 INFO mapred.LocalJobRunner:
12/03/01 05:28:32 INFO mapred.TaskRunner: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
12/03/01 05:28:32 INFO mapred.LocalJobRunner:
12/03/01 05:28:32 INFO mapred.TaskRunner: Task attempt_local_0001_r_000000_0 is allowed to commit now
12/03/01 05:28:32 INFO mapred.JobClient:  map 100% reduce 0%
12/03/01 05:28:32 INFO output.FileOutputCommitter: Saved output of task 'attempt_local_0001_r_000000_0' to results
12/03/01 05:28:32 INFO mapred.LocalJobRunner: reduce > reduce
12/03/01 05:28:32 INFO mapred.TaskRunner: Task 'attempt_local_0001_r_000000_0' done.
12/03/01 05:28:33 INFO mapred.JobClient:  map 100% reduce 100%
12/03/01 05:28:33 INFO mapred.JobClient: Job complete: job_local_0001
12/03/01 05:28:33 INFO mapred.JobClient: Counters: 12
12/03/01 05:28:33 INFO mapred.JobClient:   FileSystemCounters
12/03/01 05:28:33 INFO mapred.JobClient:     FILE_BYTES_READ=1320214
12/03/01 05:28:33 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=1275041
12/03/01 05:28:33 INFO mapred.JobClient:   Map-Reduce Framework
12/03/01 05:28:33 INFO mapred.JobClient:     Reduce input groups=1264
12/03/01 05:28:33 INFO mapred.JobClient:     Combine output records=0
12/03/01 05:28:33 INFO mapred.JobClient:     Map input records=2528
12/03/01 05:28:33 INFO mapred.JobClient:     Reduce shuffle bytes=0
12/03/01 05:28:33 INFO mapred.JobClient:     Reduce output records=2528
12/03/01 05:28:33 INFO mapred.JobClient:     Spilled Records=5056
12/03/01 05:28:33 INFO mapred.JobClient:     Map output bytes=301472
12/03/01 05:28:33 INFO mapred.JobClient:     Combine input records=0
12/03/01 05:28:33 INFO mapred.JobClient:     Map output records=2528
12/03/01 05:28:33 INFO mapred.JobClient:     Reduce input records=2528

내 매퍼 클래스는 매우 간단합니다. 그것은 텍스트 파일을 읽습니다. 각 행에 "m"이 추가됩니다.

public class MyMapper extends Mapper<LongWritable, Text, LongWritable, Text> {

 private static final Log _log = LogFactory.getLog(MyMapper.class);

 @Override
 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  String s = (new StringBuilder()).append(value.toString()).append("m").toString();
  context.write(key, new Text(s));
  _log.debug(key.toString() + " => " + s);
 }
}

내 감속기 수업 또한 매우 간단합니다. 단순히 "r"을 줄에 추가합니다.

public class MyReducer extends Reducer<LongWritable, Text, LongWritable, Text> {

private static final Log _log = LogFactory.getLog(MyReducer.class);

@Override
public void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
 for(Iterator<Text> it = values.iterator(); it.hasNext();) {
  Text txt = it.next();
  String s = (new StringBuilder()).append(txt.toString()).append("r").toString();
  context.write(key, new Text(s));
  _log.debug(key.toString() + " => " + s);
  }
 }
}

내 직업 클래스는 다음과 같습니다.

public class MyJob extends Configured implements Tool {

public static void main(String[] args) throws Exception {
 ToolRunner.run(new Configuration(), new MyJob(), args);
}

@Override
public int run(String[] args) throws Exception {
 Configuration conf = getConf();
 Path input = new Path(conf.get("mapred.input.dir"));
 Path output = new Path(conf.get("mapred.output.dir"));

 System.out.println("input = " + input);
 System.out.println("output = " + output);

 Job job = new Job(conf, "dummy job");
 job.setMapOutputKeyClass(LongWritable.class);
 job.setMapOutputValueClass(Text.class);
 job.setOutputKeyClass(LongWritable.class);
 job.setOutputValueClass(Text.class);

 job.setMapperClass(MyMapper.class);
 job.setReducerClass(MyReducer.class);

 FileInputFormat.addInputPath(job, input);
 FileOutputFormat.setOutputPath(job, output);

 job.setJarByClass(MyJob.class);

 return job.waitForCompletion(true) ? 0 : 1;
 }
}

내 입력 데이터는 다음과 같습니다.

T, T
T, T
T, T
F, F
F, F
F, F
F, F
T, F
F, T

내 Job을 실행 한 후에, 나는 다음과 같은 결과를 얻는다.

0   T, Tmr
0   T, Tmr
6   T, Tmr
6   T, Tmr
12  T, Tmr
12  T, Tmr
18  F, Fmr
18  F, Fmr
24  F, Fmr
24  F, Fmr
30  F, Fmr
30  F, Fmr
36  F, Fmr
36  F, Fmr
42  T, Fmr
42  T, Fmr
48  F, Tmr
48  F, Tmr

내 직업을 설정하는 데 문제가 있었습니까? 나는 내 작업을 실행하기 위해 다음과 같은 방법을 시도했다. 그리고이 접근법에서 파일은 한 번만 읽혀진다. 왜 이런거야? System.out.println (inpath) 및 System.out.println (outpath) 값은 동일합니다! 도움?

public class MyJob2 {

public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: MyJob2 <in> <out>");
      System.exit(2);
    }

    String sInput = args[0];
    String sOutput = args[1];

    Path input = new Path(sInput);
    Path output = new Path(sOutput);

    System.out.println("input = " + input);
    System.out.println("output = " + output);

    Job job = new Job(conf, "dummy job");
    job.setMapOutputKeyClass(LongWritable.class);
    job.setMapOutputValueClass(Text.class);
    job.setOutputKeyClass(LongWritable.class);
    job.setOutputValueClass(Text.class);

    job.setMapperClass(MyMapper.class);
    job.setReducerClass(MyReducer.class);

    FileInputFormat.addInputPath(job, input);
    FileOutputFormat.setOutputPath(job, output);

    job.setJarByClass(MyJob2.class);

    int result = job.waitForCompletion(true) ? 0 : 1;
    System.exit(result);
 }
}

해결법

  1. ==============================

    1.나는 hadoop 메일 링리스트의 도움을 받았다. 내 문제는 아래 줄에 있었다.

    나는 hadoop 메일 링리스트의 도움을 받았다. 내 문제는 아래 줄에 있었다.

    FileInputFormat.addInputPath(job, input);
    

    이 줄은 간단히 입력을 config에 추가합니다. 이 행을 주석 처리 한 후 입력 파일은 한 번만 읽습니다. 사실, 나는 또한 다른 라인을 주석 처리했다.

    FileOutputFormat.setOutputPath(job, output);
    

    모든 것이 여전히 작동합니다.

  2. ==============================

    2.나는 비슷한 문제가 있었지만 다른 이유가있다 : 리눅스는 분명히 내 입력 파일 (~ input.txt)의 숨겨진 사본을 만들었으므로이 오류를 얻는 두 번째 방법이다.

    나는 비슷한 문제가 있었지만 다른 이유가있다 : 리눅스는 분명히 내 입력 파일 (~ input.txt)의 숨겨진 사본을 만들었으므로이 오류를 얻는 두 번째 방법이다.

  3. from https://stackoverflow.com/questions/9514710/why-is-my-sequence-file-being-read-twice-in-my-hadoop-mapper-class by cc-by-sa and MIT license