[HADOOP] (하둡) MapReduce - 체인 작업 - JobControl이 멈추지 않습니다
HADOOP(하둡) MapReduce - 체인 작업 - JobControl이 멈추지 않습니다
두 개의 MapReduce 작업을 연결해야합니다. JobControl을 사용하여 job2를 job1에 종속적으로 설정했습니다. 그것은 작동, 출력 파일이 만들어집니다 !! 그러나 멈추지 않습니다! 셸에서는이 상태로 유지됩니다.
12/09/11 19:06:24 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
12/09/11 19:06:25 INFO input.FileInputFormat: Total input paths to process : 1
12/09/11 19:06:25 INFO util.NativeCodeLoader: Loaded the native-hadoop library
12/09/11 19:06:25 WARN snappy.LoadSnappy: Snappy native library not loaded
12/09/11 19:07:00 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
12/09/11 19:07:00 INFO input.FileInputFormat: Total input paths to process : 1
어떻게 그걸 막을 수 있니? 이게 내 주안이야.
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Configuration conf2 = new Configuration();
Job job1 = new Job(conf, "canzoni");
job1.setJarByClass(CanzoniOrdinate.class);
job1.setMapperClass(CanzoniMapper.class);
job1.setReducerClass(CanzoniReducer.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(IntWritable.class);
ControlledJob cJob1 = new ControlledJob(conf);
cJob1.setJob(job1);
FileInputFormat.addInputPath(job1, new Path(args[0]));
FileOutputFormat.setOutputPath(job1, new Path("/user/hduser/tmp"));
Job job2 = new Job(conf2, "songsort");
job2.setJarByClass(CanzoniOrdinate.class);
job2.setMapperClass(CanzoniSorterMapper.class);
job2.setSortComparatorClass(ReverseOrder.class);
job2.setInputFormatClass(KeyValueTextInputFormat.class);
job2.setReducerClass(CanzoniSorterReducer.class);
job2.setMapOutputKeyClass(IntWritable.class);
job2.setMapOutputValueClass(Text.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(IntWritable.class);
ControlledJob cJob2 = new ControlledJob(conf2);
cJob2.setJob(job2);
FileInputFormat.addInputPath(job2, new Path("/user/hduser/tmp/part*"));
FileOutputFormat.setOutputPath(job2, new Path(args[1]));
JobControl jobctrl = new JobControl("jobctrl");
jobctrl.addJob(cJob1);
jobctrl.addJob(cJob2);
cJob2.addDependingJob(cJob1);
jobctrl.run();
////////////////
// NEW CODE ///
//////////////
// delete jobctrl.run();
Thread t = new Thread(jobctrl);
t.start();
String oldStatusJ1 = null;
String oldStatusJ2 = null;
while (!jobctrl.allFinished()) {
String status =cJob1.toString();
String status2 =cJob2.toString();
if (!status.equals(oldStatusJ1)) {
System.out.println(status);
oldStatusJ1 = status;
}
if (!status2.equals(oldStatusJ2)) {
System.out.println(status2);
oldStatusJ2 = status2;
}
}
System.exit(0);
} }
해결법
-
==============================
1.나는 Pietro가 위에 언급 한 것을 본질적으로했다.
나는 Pietro가 위에 언급 한 것을 본질적으로했다.
public class JobRunner implements Runnable { private JobControl control; public JobRunner(JobControl _control) { this.control = _control; } public void run() { this.control.run(); } }
내 맵 / 수업에서 내가 가진 것 :
public void handleRun(JobControl control) throws InterruptedException { JobRunner runner = new JobRunner(control); Thread t = new Thread(runner); t.start(); while (!control.allFinished()) { System.out.println("Still running..."); Thread.sleep(5000); } }
난 그냥 jobControl 개체를 전달합니다.
-
==============================
2.JobControl 객체 자체는 Runnable이므로 다음과 같이 사용할 수 있습니다.
JobControl 객체 자체는 Runnable이므로 다음과 같이 사용할 수 있습니다.
new Thread(myJobControlInstance).start()
-
==============================
3.sinemetu1이 무엇을 공유했는지 코드 스 니펫에 맞게 조정 해보십시오.
sinemetu1이 무엇을 공유했는지 코드 스 니펫에 맞게 조정 해보십시오.
JobController로서의 JobRunner 에의 호출을 그 자체로 끊을 수 있습니다. Runnable를 구현합니다.
Thread thread = new Thread(jobControl); thread.start(); while (!jobControl.allFinished()) { System.out.println("Still running..."); Thread.sleep(5000); }
또한 사용자가 JobControl이 새 스레드에서만 실행될 수 있음을 확인하는 링크를 발견했습니다. https://www.mail-archive.com/common-user@hadoop.apache.org/msg00556.html
-
==============================
4.이 시도:
이 시도:
Thread jcThread = new Thread(jobControl); jcThread.start(); System.out.println("循环判断jobControl运行状态 >>>>>>>>>>>>>>>>"); while (true) { if (jobControl.allFinished()) { System.out.println("====>> jobControl.allFinished=" + jobControl.getSuccessfulJobList()); jobControl.stop(); // 如果不加 break 或者 return,程序会一直循环 break; } if (jobControl.getFailedJobList().size() > 0) { succ = 0; System.out.println("====>> jobControl.getFailedJobList=" + jobControl.getFailedJobList()); jobControl.stop(); // 如果不加 break 或者 return,程序会一直循环 break; } }
from https://stackoverflow.com/questions/12374928/hadoop-mapreduce-chain-jobs-jobcontrol-doesnt-stop by cc-by-sa and MIT license
'HADOOP' 카테고리의 다른 글
[HADOOP] 하나의지도에 여러 줄의 텍스트 제공 (0) | 2019.06.15 |
---|---|
[HADOOP] RDD가 아닌 Spark 프로그램에서 일반 텍스트 HDFS (또는 로컬) 파일을 쓸 수 있습니까? (0) | 2019.06.15 |
[HADOOP] 먼저 실행되는 것은 파티셔너 또는 결합기입니까? (0) | 2019.06.15 |
[HADOOP] MapReduceBase 및 Mapper가 더 이상 사용되지 않습니다. (0) | 2019.06.15 |
[HADOOP] Flume 설정을 테스트하기 위해 HTTP 소스를 설정하는 방법은 무엇입니까? (0) | 2019.06.15 |