복붙노트

[HADOOP] 동적으로 컴파일하고 다른 자바 파일에서 하둡 작업을 실행

HADOOP

동적으로 컴파일하고 다른 자바 파일에서 하둡 작업을 실행

나는 동적으로 컴파일하는 맵리 듀스 작업의 소스 코드를 수신하고 하둡 클러스터에 작업을 실행하는 자바 파일을 작성하려합니다. 이 도달하기 위해, 나는 3 가지 방법을 불리는 컴파일 (), makeJAR ()와 run_Hadoop_Job () 작성했습니다. 모든 JAR 파일의 편집 및 생성에 잘 작동합니다. 작업이 하둡에 제출되는 경우에는, 즉시 작업이 시작, 그것은 발견에 문제가 매퍼 / 감속기 클래스를 필요에 직면하고 Mapper_Class 및 Reducer_Class * (java.lang.ClassNotFoundException가 모두 ClassNotFoundException가 예외 : reza.rCloud.Mapper_Reducer_Classes를 $ Mapper_Class.class) *. 내가 필요한 매퍼 / 감속기 클래스를 참조했지만 내가 몇 후에 그것을 알아낼 수 없습니다 방법에 문제가있을 것을 알고있다. 이 문제를 해결하는 방법에 어떤 도움 / 제안 높게 평가된다.

프로젝트의 세부 사항에 대해서 : 나는 Mapper_Class 및 Reducer_Class에 대한 소스 코드를 포함하는 "rCloud_test / SRC / 레자 / Mapper_Reducer_Classes.java"라는 파일이 있습니다. 이 파일은 궁극적으로 런타임 동안 수신하지만 지금 나는 그것에서 하둡 단어 수 예제를 복사하여 내 주요 클래스 파일과 같은 폴더에 로컬로 저장 : rCloud_test / SRC / 레자 / Platform2.java.

다음은이 프로젝트에 대한 내 기본 클래스 인 Platform2.java의 main () 메소드를 볼 수 아래 :

    public static void main(String[] args){
    System.out.println("Code Execution Started");
    String className = "Mapper_Reducer_Classes";
    Platform2 myPlatform = new Platform2();

    //step 1: compile the received class file dynamically:
    boolean compResult = myPlatform.compile(className); 
    System.out.println(className + ".java compilation result: "+compResult);

    //step 2: make a JAR file out of the compiled file:
    if (compResult) {
        compResult  = myPlatform.makeJAR("jar_file", myPlatform.compilation_Output_Folder);
        System.out.println("JAR creation result: "+compResult);
    } 
    //step 3: Now let's run the Hadoop job:
    if (compResult) {       
        compResult = myPlatform.run_Hadoop_Job(className);
        System.out.println("Running on Hadoop result: "+compResult);
    }

나를 모든 문제를 일으키는 방법은 다음과 같이 인 run_Hadoop_Job ()이다 :

private boolean run_Hadoop_Job(String className){
try{
    System.out.println("*Starting to run the code on Hadoop...");
    String[] argsTemp = { "project_test/input", "project_test/output" };

    Configuration conf = new Configuration();
    conf.set("fs.default.name", "hdfs://localhost:54310");
    conf.set("mapred.job.tracker", "localhost:54311");

    conf.set("mapred.jar", jar_Output_Folder + "/jar_file"+".jar");

    conf.set("libjars", required_Execution_Classes);

    //THIS IS WHERE IT CAN'T FIND THE MENTIONED CLASSES, ALTHOUGH THEY EXIST BOTH ON DISK 
    // AND IN THE CREATED JAR FILE:??????
    System.out.println("Getting Mapper/Reducer package name: " + 
                        Mapper_Reducer_Classes.class.getName());
    conf.set("mapreduce.map.class", "reza.rCloud.Mapper_Reducer_Classes$Mapper_Class");
    conf.set("mapreduce.reduce.class", "reza.rCloud.Mapper_Reducer_Classes$Reducer_Class");

    Job job = new Job(conf, "Hadoop Example for dynamically and programmatically compiling-running a job");
    job.setJarByClass(Platform2.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    FileInputFormat.addInputPath(job, new Path(argsTemp[0]));
    FileSystem fs = FileSystem.get(conf);
    Path out = new Path(argsTemp[1]);
    fs.delete(out, true);
    FileOutputFormat.setOutputPath(job, new Path(argsTemp[1]));

    //job.submit();
    System.out.println("*and now submitting the job to Hadoop...");
    System.exit(job.waitForCompletion(true) ? 0 : 1); 
    System.out.println("Job Finished!");        
} catch (Exception e) {         
            System.out.println("****************Exception!" ); 
            e.printStackTrace();
            return false; 
    }
return true;
}

필요한 경우, 여기에 컴파일 () 메소드의 소스 코드입니다 :

private boolean compile(String className) {
    String fileToCompile =  JOB_FOLDER   + "/" +className+".java";
    JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();       
    FileOutputStream errorStream = null;        
    try{
        errorStream = new FileOutputStream(JOB_FOLDER + "/logs/Errors.txt");
    } catch(FileNotFoundException e){ 
        //if problem creating the file, default wil be console 
    }   

    int compilationResult = 
            compiler.run(   null, null, errorStream, 
                            "-classpath", required_Compilation_Classes,
                            "-d", compilation_Output_Folder,
                            fileToCompile);
    if (compilationResult == 0) {
        //Compilation is successful:
        return true;
    } else {
        //Compilation Failed:
        return false;
    }   
}

및 makeJAR () 메소드에 대한 소스 코드 :

private boolean makeJAR(String outputFileName, String inputDirectory) {
    Manifest manifest = new Manifest();
    manifest.getMainAttributes().put(Attributes.Name.MANIFEST_VERSION,
            "1.0");

    JarOutputStream target = null;                      
    try {       
        target = new JarOutputStream(new FileOutputStream(
                jar_Output_Folder+ "/"  
                + outputFileName+".jar"                      ), manifest);
        add(new File(inputDirectory), target);
    } catch (Exception e) { return false; }
    finally {
        if (target != null)
            try{
                target.close();
            } catch (Exception e) { return false; }
    }
    return true;
}

    private void add(File source, JarOutputStream target) throws IOException
{
  BufferedInputStream in = null;
  try
  {
    if (source.isDirectory())
    {
      String name = source.getPath().replace("\\", "/");
      if (!name.isEmpty())
      {
        if (!name.endsWith("/"))
          name += "/";
        JarEntry entry = new JarEntry(name);
        entry.setTime(source.lastModified());
        target.putNextEntry(entry);
        target.closeEntry();
      }
      for (File nestedFile: source.listFiles())
        add(nestedFile, target);
      return;
    }

    JarEntry entry = new JarEntry(source.getPath().replace("\\", "/"));
    entry.setTime(source.lastModified());
    target.putNextEntry(entry);
    in = new BufferedInputStream(new FileInputStream(source));

    byte[] buffer = new byte[1024];
    while (true)
    {
      int count = in.read(buffer);
      if (count == -1)
        break;
      target.write(buffer, 0, count);
    }
    target.closeEntry();
  }
  finally
  {
    if (in != null)
      in.close();
  }
}

마지막으로 파일을 액세스에 사용되는 매개 변수를 고정 :

private String JOB_FOLDER = "/Users/reza/My_Software/rCloud_test/src/reza/rCloud";
private String HADOOP_SOURCE_FOLDER = "/Users/reza/My_Software/hadoop-0.20.2";
private String required_Compilation_Classes = HADOOP_SOURCE_FOLDER + "/hadoop-0.20.2-core.jar";
private String required_Execution_Classes = required_Compilation_Classes + "," +
     "/Users/reza/My_Software/ActorFoundry_dist_ver/lib/commons-cli-1.1.jar," +
     "/Users/reza/My_Software/ActorFoundry_dist_ver/lib/commons-logging-1.1.1.jar";
public String compilation_Output_Folder = "/Users/reza/My_Software/rCloud_test/dyn_classes";
private String jar_Output_Folder = "/Users/reza/My_Software/rCloud_test/dyn_jar";

PLATFORM2를 실행 한 결과, 디스크에있는 프로젝트의 구조는 다음과 같습니다 :

rCloud_test / 클래스 / 레자 / rCloud / Platform2.class는 다음 PLATFORM2 클래스를 포함 rCloud_test / dyn_classes / 레자 / rCloud /가 Mapper_Reducer_Classes.class을위한 클래스를 포함, Mapper_Reducer_Classes $ Mapper_Class.class 및 Mapper_Reducer_Classes $ Reducer_Class.class rCloud_test / dyn_jar / jar_file.jar는 생성 된 jar 파일을 포함

REVSED : 여기 rCloud_test / SRC / 레자의 소스 코드입니다 / rCloud / Mapper_Reducer_Classes.java :

package reza.rCloud;

import java.io.IOException;
import java.lang.InterruptedException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class Mapper_Reducer_Classes {
/**
 * The map class of WordCount.
 */
public static class Mapper_Class
    extends Mapper<Object, Text, Text, IntWritable> {

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context)
        throws IOException, InterruptedException {
        StringTokenizer itr = new StringTokenizer(value.toString());
        while (itr.hasMoreTokens()) {
            word.set(itr.nextToken());
            context.write(word, one);
        }
    }
}
/**
 * The reducer class of WordCount
 */
public static class Reducer_Class
    extends Reducer<Text, IntWritable, Text, IntWritable> {
    public void reduce(Text key, Iterable<IntWritable> values, Context context)
        throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable value : values) {
            sum += value.get();
        }
        context.write(key, new IntWritable(sum));
    }
}
}

해결법

  1. ==============================

    1.setClass () 메소드를 사용하여 그것들을 설정하려고 :

    setClass () 메소드를 사용하여 그것들을 설정하려고 :

    conf.setClass("mapreduce.map.class", 
                  Class.forName("reza.rCloud.Mapper_Reducer_Classes$Mapper_Class"),
                  Mapper.class);
    
    conf.setClass("mapreduce.reduce.class",
                  Class.forName("reza.rCloud.Mapper_Reducer_Classes$Reducer_Class"),
                  Reducer.class);
    
  2. from https://stackoverflow.com/questions/14071496/dynamically-compiling-and-running-a-hadoop-job-from-another-java-file by cc-by-sa and MIT license