[HADOOP] Hadoop에서 여러 MapReduce 작업 체인화
HADOOPHadoop에서 여러 MapReduce 작업 체인화
MapReduce를 적용하는 많은 실제 상황에서 최종 알고리즘은 결국 여러 MapReduce 단계가됩니다.
즉 Map1, Reduce1, Map2, Reduce2 등이 있습니다.
따라서 다음지도의 입력으로 필요한 마지막 축소 출력을 얻을 수 있습니다.
중간 데이터는 일단 (일반적으로) 파이프 라인이 성공적으로 완료되면 유지하기를 원하지 않는 것입니다. 또한이 중간 데이터는 일반적으로 일부 데이터 구조 (예 : '지도'또는 '집합')이므로 이러한 키 - 값 쌍을 쓰고 읽는 데 너무 많은 노력을 기울 이길 원하지 않습니다.
Hadoop에서 권장되는 방법은 무엇입니까?
이 중간 데이터를 올바른 방법으로 처리하는 방법을 보여주는 간단한 (간단한) 예가 있습니까?
해결법
-
==============================
1.제 생각에는 야후의 개발자 네트워크에 관한이 튜토리얼은 다음과 같이 도움이 될 것입니다 : Chaining Jobs
제 생각에는 야후의 개발자 네트워크에 관한이 튜토리얼은 다음과 같이 도움이 될 것입니다 : Chaining Jobs
JobClient.runJob ()을 사용합니다. 첫 번째 작업의 데이터 출력 경로가 두 번째 작업의 입력 경로가됩니다. 이들은 적절한 코드로 작업에 인수로 전달되어 작업을 분석하고 매개 변수를 설정해야합니다.
위의 방법은 이전 mapred API에서 사용한 방법과 비슷하지만 여전히 작동해야한다고 생각합니다. 새로운 mapreduce API에도 비슷한 메소드가 있지만 그게 무엇인지는 잘 모르겠습니다.
작업이 끝나면 중간 데이터를 제거하는 한 코드에서이를 수행 할 수 있습니다. 내가 전에 해본 방법은 다음과 같은 것을 사용하고있다.
FileSystem.delete(Path f, boolean recursive);
여기서 경로는 데이터의 HDFS에서의 위치입니다. 다른 작업이 필요하지 않으면이 데이터 만 삭제해야합니다.
-
==============================
2.당신이 그것을 할 수있는 많은 방법이 있습니다.
당신이 그것을 할 수있는 많은 방법이 있습니다.
(1) 계단식 작업
첫 번째 작업에 대해 JobConf 오브젝트 "job1"을 작성하고 "input"을 입력 디렉토리로, "temp"를 출력 디렉토리로 모든 매개 변수를 설정하십시오. 이 작업을 실행하십시오 :
JobClient.run(job1).
바로 아래에 두 번째 작업에 대한 JobConf 객체 "job2"를 만들고 "temp"를 input 디렉토리로, "output"을 출력 디렉토리로 사용하여 모든 매개 변수를 설정합니다. 이 작업을 실행하십시오 :
JobClient.run(job2).
(2) JobClient.run을 사용하지 않는다는 점을 제외하고 두 개의 JobConf 객체를 만들고 (1)처럼 모든 매개 변수를 설정합니다.
그런 다음 jobconfs를 매개 변수로 사용하여 두 개의 Job 객체를 만듭니다.
Job job1=new Job(jobconf1); Job job2=new Job(jobconf2);
jobControl 객체를 사용하여 작업 종속성을 지정한 다음 작업을 실행합니다.
JobControl jbcntrl=new JobControl("jbcntrl"); jbcntrl.addJob(job1); jbcntrl.addJob(job2); job2.addDependingJob(job1); jbcntrl.run();
(3) 맵 + | 축소 | Map *을 사용하면 Hadoop 버전 0.19 이상과 함께 제공되는 ChainMapper 및 ChainReducer 클래스를 사용할 수 있습니다.
건배
-
==============================
3.실제로이 작업을 수행하는 데는 여러 가지 방법이 있습니다. 나는 두 가지에 초점을 맞출 것이다.
실제로이 작업을 수행하는 데는 여러 가지 방법이 있습니다. 나는 두 가지에 초점을 맞출 것이다.
하나는 Riffle (http://github.com/cwensel/riffle)을 통해 종속물을 식별하고 의존성 (토폴로지) 순서로 '실행'하기위한 주석 라이브러리입니다.
또는 Cascade (http://www.cascading.org/)에서 Cascade (및 MapReduceFlow)를 사용할 수 있습니다. 이후 버전에서는 Riffle 주석을 지원할 예정이지만 원시 MR JobConf 작업에서는 훌륭하게 작동합니다.
이에 대한 변형은 MR 작업을 전혀 손으로 관리하지 않고 계단식 API를 사용하여 응용 프로그램을 개발하는 것입니다. 그런 다음 JobConf와 작업 체이닝은 계단식 플래너와 Flow 클래스를 통해 내부적으로 처리됩니다.
이렇게하면 Hadoop 작업 관리 등의 메커니즘에 집중하는 것이 아니라 문제에 집중하는 데 시간을 투자 할 수 있습니다. 심지어 clojure 또는 jruby와 같은 다른 언어를 계층화하여 개발 및 응용 프로그램을 더욱 단순화 할 수도 있습니다. http://www.cascading.org/modules.html
-
==============================
4.JobConf 객체를 사용하여 작업 체인을 수행했습니다. 저는 작업을 묶는 WordCount 예제를 사용했습니다. 하나의 직업은 주어진 산출에서 단어가 몇 번 반복되는지를 계산합니다. 두 번째 작업은 첫 번째 작업 출력을 입력으로 취하여 주어진 입력의 전체 단어를 계산합니다. 다음은 Driver 클래스에 배치해야하는 코드입니다.
JobConf 객체를 사용하여 작업 체인을 수행했습니다. 저는 작업을 묶는 WordCount 예제를 사용했습니다. 하나의 직업은 주어진 산출에서 단어가 몇 번 반복되는지를 계산합니다. 두 번째 작업은 첫 번째 작업 출력을 입력으로 취하여 주어진 입력의 전체 단어를 계산합니다. 다음은 Driver 클래스에 배치해야하는 코드입니다.
//First Job - Counts, how many times a word encountered in a given file JobConf job1 = new JobConf(WordCount.class); job1.setJobName("WordCount"); job1.setOutputKeyClass(Text.class); job1.setOutputValueClass(IntWritable.class); job1.setMapperClass(WordCountMapper.class); job1.setCombinerClass(WordCountReducer.class); job1.setReducerClass(WordCountReducer.class); job1.setInputFormat(TextInputFormat.class); job1.setOutputFormat(TextOutputFormat.class); //Ensure that a folder with the "input_data" exists on HDFS and contains the input files FileInputFormat.setInputPaths(job1, new Path("input_data")); //"first_job_output" contains data that how many times a word occurred in the given file //This will be the input to the second job. For second job, input data name should be //"first_job_output". FileOutputFormat.setOutputPath(job1, new Path("first_job_output")); JobClient.runJob(job1); //Second Job - Counts total number of words in a given file JobConf job2 = new JobConf(TotalWords.class); job2.setJobName("TotalWords"); job2.setOutputKeyClass(Text.class); job2.setOutputValueClass(IntWritable.class); job2.setMapperClass(TotalWordsMapper.class); job2.setCombinerClass(TotalWordsReducer.class); job2.setReducerClass(TotalWordsReducer.class); job2.setInputFormat(TextInputFormat.class); job2.setOutputFormat(TextOutputFormat.class); //Path name for this job should match first job's output path name FileInputFormat.setInputPaths(job2, new Path("first_job_output")); //This will contain the final output. If you want to send this jobs output //as input to third job, then third jobs input path name should be "second_job_output" //In this way, jobs can be chained, sending output one to other as input and get the //final output FileOutputFormat.setOutputPath(job2, new Path("second_job_output")); JobClient.runJob(job2);
이러한 작업을 실행하는 명령은 다음과 같습니다.
bin / hadoop jar 토탈 워드.
우리는 명령에 대한 최종 작업 이름을 제공해야합니다. 위의 경우 TotalWords입니다.
-
==============================
5.MR 체인은 코드에 나와있는 방식대로 실행할 수 있습니다. 주의 사항 : 드라이버 코드 만 제공되었습니다.
MR 체인은 코드에 나와있는 방식대로 실행할 수 있습니다. 주의 사항 : 드라이버 코드 만 제공되었습니다.
public class WordCountSorting { // here the word keys shall be sorted //let us write the wordcount logic first public static void main(String[] args)throws IOException,InterruptedException,ClassNotFoundException { //THE DRIVER CODE FOR MR CHAIN Configuration conf1=new Configuration(); Job j1=Job.getInstance(conf1); j1.setJarByClass(WordCountSorting.class); j1.setMapperClass(MyMapper.class); j1.setReducerClass(MyReducer.class); j1.setMapOutputKeyClass(Text.class); j1.setMapOutputValueClass(IntWritable.class); j1.setOutputKeyClass(LongWritable.class); j1.setOutputValueClass(Text.class); Path outputPath=new Path("FirstMapper"); FileInputFormat.addInputPath(j1,new Path(args[0])); FileOutputFormat.setOutputPath(j1,outputPath); outputPath.getFileSystem(conf1).delete(outputPath); j1.waitForCompletion(true); Configuration conf2=new Configuration(); Job j2=Job.getInstance(conf2); j2.setJarByClass(WordCountSorting.class); j2.setMapperClass(MyMapper2.class); j2.setNumReduceTasks(0); j2.setOutputKeyClass(Text.class); j2.setOutputValueClass(IntWritable.class); Path outputPath1=new Path(args[1]); FileInputFormat.addInputPath(j2, outputPath); FileOutputFormat.setOutputPath(j2, outputPath1); outputPath1.getFileSystem(conf2).delete(outputPath1, true); System.exit(j2.waitForCompletion(true)?0:1); } }
순번 (JOB1) MAP → REDUCE → (JOB2) MAP 이것은 정렬 된 키를 얻기 위해 수행되었지만 트리 맵 사용과 같은 다른 방법이 있습니다. 그러나 나는 일이 묶인 방식에주의를 집중하고 싶다 !! 고맙습니다
-
==============================
6.MapReduce 작업을 처리하기 위해 oozie를 사용할 수 있습니다. http://issues.apache.org/jira/browse/HADOOP-5303
MapReduce 작업을 처리하기 위해 oozie를 사용할 수 있습니다. http://issues.apache.org/jira/browse/HADOOP-5303
-
==============================
7.Apache Mahout 프로젝트에는 여러 MapReduce 작업을 함께 묶는 예제가 있습니다. 예제 중 하나는 다음에서 찾을 수 있습니다.
Apache Mahout 프로젝트에는 여러 MapReduce 작업을 함께 묶는 예제가 있습니다. 예제 중 하나는 다음에서 찾을 수 있습니다.
RecommenderJob.java
http://search-lucene.com/c/Mahout:/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java%7C%7CRecommenderJob
-
==============================
8.작업의 waitForCompletion (true) 메소드를 사용하여 작업 간의 종속성을 정의 할 수 있습니다.
작업의 waitForCompletion (true) 메소드를 사용하여 작업 간의 종속성을 정의 할 수 있습니다.
제 시나리오에는 서로에게 의존하는 3 가지 일자리가있었습니다. 드라이버 클래스에서 나는 아래 코드를 사용했으며 예상대로 작동합니다.
public static void main(String[] args) throws Exception { // TODO Auto-generated method stub CCJobExecution ccJobExecution = new CCJobExecution(); Job distanceTimeFraudJob = ccJobExecution.configureDistanceTimeFraud(new Configuration(),args[0], args[1]); Job spendingFraudJob = ccJobExecution.configureSpendingFraud(new Configuration(),args[0], args[1]); Job locationFraudJob = ccJobExecution.configureLocationFraud(new Configuration(),args[0], args[1]); System.out.println("****************Started Executing distanceTimeFraudJob ================"); distanceTimeFraudJob.submit(); if(distanceTimeFraudJob.waitForCompletion(true)) { System.out.println("=================Completed DistanceTimeFraudJob================= "); System.out.println("=================Started Executing spendingFraudJob ================"); spendingFraudJob.submit(); if(spendingFraudJob.waitForCompletion(true)) { System.out.println("=================Completed spendingFraudJob================= "); System.out.println("=================Started locationFraudJob================= "); locationFraudJob.submit(); if(locationFraudJob.waitForCompletion(true)) { System.out.println("=================Completed locationFraudJob================="); } } } }
-
==============================
9.새로운 클래스 org.apache.hadoop.mapreduce.lib.chain.ChainMapper가이 시나리오를 돕습니다.
새로운 클래스 org.apache.hadoop.mapreduce.lib.chain.ChainMapper가이 시나리오를 돕습니다.
-
==============================
10.복잡한 서버 기반 Hadoop 워크 플로우 엔진 (예 : oozie)이 있지만 워크 플로로 여러 Hadoop 작업을 실행할 수있는 간단한 Java 라이브러리가 있습니다. 작업 간 종속성을 정의하는 작업 구성 및 워크 플로는 JSON 파일에서 구성됩니다. 모든 것이 외부에서 구성 가능하며 기존 맵에서 변경을 필요로하지 않으므로 구현을 워크 플로우의 일부로 축소 할 수 있습니다.
복잡한 서버 기반 Hadoop 워크 플로우 엔진 (예 : oozie)이 있지만 워크 플로로 여러 Hadoop 작업을 실행할 수있는 간단한 Java 라이브러리가 있습니다. 작업 간 종속성을 정의하는 작업 구성 및 워크 플로는 JSON 파일에서 구성됩니다. 모든 것이 외부에서 구성 가능하며 기존 맵에서 변경을 필요로하지 않으므로 구현을 워크 플로우의 일부로 축소 할 수 있습니다.
자세한 내용은 여기를 참조하십시오. 소스 코드와 jar는 github에서 사용할 수 있습니다.
http://pkghosh.wordpress.com/2011/05/22/hadoop-orchestration/
프라 너브
-
==============================
11.오지는 필연적 인 일이 이전 직무에서 직접 입력을받는 데 도움이된다고 생각합니다. 이렇게하면 작업 제어로 수행되는 I / O 작업이 수행되지 않습니다.
오지는 필연적 인 일이 이전 직무에서 직접 입력을받는 데 도움이된다고 생각합니다. 이렇게하면 작업 제어로 수행되는 I / O 작업이 수행되지 않습니다.
-
==============================
12.프로그래밍 방식으로 작업을 체인화하려면 JobControl을 사용해야합니다. 사용법은 매우 간단합니다.
프로그래밍 방식으로 작업을 체인화하려면 JobControl을 사용해야합니다. 사용법은 매우 간단합니다.
JobControl jobControl = new JobControl(name);
그 후에 ControlledJob 인스턴스를 추가합니다. ControlledJob은 의존성이있는 작업을 정의하므로 작업의 "체인"에 맞게 입력 및 출력을 자동으로 연결합니다.
jobControl.add(new ControlledJob(job, Arrays.asList(controlledjob1, controlledjob2)); jobControl.run();
체인을 시작합니다. 이를 별도의 스레드에 넣기를 원할 것입니다. 이렇게하면 실행중인 체인 채찍의 상태를 확인할 수 있습니다.
while (!jobControl.allFinished()) { System.out.println("Jobs in waiting state: " + jobControl.getWaitingJobList().size()); System.out.println("Jobs in ready state: " + jobControl.getReadyJobsList().size()); System.out.println("Jobs in running state: " + jobControl.getRunningJobList().size()); List<ControlledJob> successfulJobList = jobControl.getSuccessfulJobList(); System.out.println("Jobs in success state: " + successfulJobList.size()); List<ControlledJob> failedJobList = jobControl.getFailedJobList(); System.out.println("Jobs in failed state: " + failedJobList.size()); }
-
==============================
13.MRJob1의 o / p를 MRJob2의 i / p로 지정하려는 요구 사항에서 언급했듯이이 용도로 oozie 워크 플로를 사용할 수 있습니다. 또한 중간 데이터를 다음 MRJob에서 사용할 것이므로 HDFS에 기록하는 것도 고려해 볼 수 있습니다. 작업이 완료되면 중간 데이터를 정리할 수 있습니다.
MRJob1의 o / p를 MRJob2의 i / p로 지정하려는 요구 사항에서 언급했듯이이 용도로 oozie 워크 플로를 사용할 수 있습니다. 또한 중간 데이터를 다음 MRJob에서 사용할 것이므로 HDFS에 기록하는 것도 고려해 볼 수 있습니다. 작업이 완료되면 중간 데이터를 정리할 수 있습니다.
<start to="mr-action1"/> <action name="mr-action1"> <!-- action for MRJob1--> <!-- set output path = /tmp/intermediate/mr1--> <ok to="end"/> <error to="end"/> </action> <action name="mr-action2"> <!-- action for MRJob2--> <!-- set input path = /tmp/intermediate/mr1--> <ok to="end"/> <error to="end"/> </action> <action name="success"> <!-- action for success--> <ok to="end"/> <error to="end"/> </action> <action name="fail"> <!-- action for fail--> <ok to="end"/> <error to="end"/> </action> <end name="end"/>
from https://stackoverflow.com/questions/2499585/chaining-multiple-mapreduce-jobs-in-hadoop by cc-by-sa and MIT license
'HADOOP' 카테고리의 다른 글
[HADOOP] Hadoop 작업을 만들기 위해 mapred 또는 mapreduce 패키지를 사용하는 것이 더 좋습니까? (0) | 2019.05.28 |
---|---|
[HADOOP] 손상된 HDFS 파일을 수정하는 방법 (0) | 2019.05.28 |
[HADOOP] hadoop map 보조 정렬 줄이기 (0) | 2019.05.28 |
[HADOOP] Java 프로그램에서 Sqoop을 사용하는 방법? (0) | 2019.05.28 |
[HADOOP] 각 파티션의 요소 수가 같은 동일한 크기의 파티션으로 구성된 Spark RDD의 사용자 정의 파티션을 정의하는 방법은 무엇입니까? (0) | 2019.05.28 |