복붙노트

[HADOOP] hadoop 메소드를 사용하여 여러 디렉토리에 출력 보내기

HADOOP

hadoop 메소드를 사용하여 여러 디렉토리에 출력 보내기

내 MapReduce 작업은 데이터를 날짜별로 처리하고 출력을 특정 폴더 구조에 써야합니다. 현재 기대치는 다음과 같은 구조로 생성됩니다.

2013
    01
    02
    ..

2012
    01
    02
    ..

기타

언제든지 최대 12 개월의 데이터 만 얻을 수 있습니다. 따라서 MultipleOutputs 클래스를 사용하여 드라이버에서 다음 함수를 사용하여 12 개의 출력을 만듭니다.

public void createOutputs(){
    Calendar c = Calendar.getInstance();
    String monthStr, pathStr;

    // Create multiple outputs for last 12 months
    // TODO make 12 configurable
    for(int i = 0; i < 12; ++i ){
        //Get month and add 1 as month is 0 based index
        int month = c.get(Calendar.MONTH)+1; 
        //Add leading 0
        monthStr = month > 10 ? "" + month : "0" + month ;  
        // Generate path string in the format 2013/03/etl
        pathStr = c.get(Calendar.YEAR) + "" + monthStr + "etl";
        // Add the named output
        MultipleOutputs.addNamedOutput(config, pathStr );  
        // Move to previous month
        c.add(Calendar.MONTH, -1); 
    }
}

감속기에서 생성 된 출력을 적절한 디렉토리로 이동하는 정리 기능을 추가했습니다.

protected void cleanup(Context context) throws IOException, InterruptedException {
        // Custom function to recursively process data
        moveFiles (FileSystem.get(new Configuration()), new Path("/MyOutputPath"));
}

문제 : 출력이 _temporary 디렉토리에서 출력 디렉토리로 이동되기 전에 감속기의 정리 기능이 실행 중입니다. 그리고 이로 인해 위의 함수는 모든 데이터가 여전히 _temporary 디렉토리에 있으므로 실행시 출력을 볼 수 없습니다.

원하는 기능을 수행하는 가장 좋은 방법은 무엇입니까? 통찰력을 고맙게 여기십시오.

다음과 같은 생각 :

다음은 정리 기능의 파일 구조에 대한 샘플 로그입니다.

MyMapReduce: filepath:hdfs://localhost:8020/dev/test
MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_logs
MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_logs/history/job_201310301015_0224_1383763613843_371979_HtmlEtl
MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_temporary
MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_temporary/_attempt_201310301015_0224_r_000000_0
MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_temporary/_attempt_201310301015_0224_r_000000_0/201307etl-r-00000
MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_temporary/_attempt_201310301015_0224_r_000000_0/part-r-00000

해결법

  1. ==============================

    1.두 번째 직업이 필요하지 않아야합니다. 현재 Multiple Outputs를 사용하여 내 프로그램 중 하나에서 많은 수의 출력 디렉토리를 만듭니다. 거기에 30 디렉토리의 위쪽에도 불구하고 나는 단지 몇 개의 MultipleOutputs 객체를 사용할 수 있습니다. 쓸 때 출력 디렉토리를 설정할 수 있기 때문에 필요한 경우에만 출력 디렉토리를 결정할 수 있기 때문입니다. 다른 형식 (예 : key : Text.class, value : Text.class 및 key : Text.class 및 Value : IntWritable.class)을 사용하여 출력하려는 ​​경우 실제로는 둘 이상의 namedOutput 만 필요합니다.

    두 번째 직업이 필요하지 않아야합니다. 현재 Multiple Outputs를 사용하여 내 프로그램 중 하나에서 많은 수의 출력 디렉토리를 만듭니다. 거기에 30 디렉토리의 위쪽에도 불구하고 나는 단지 몇 개의 MultipleOutputs 객체를 사용할 수 있습니다. 쓸 때 출력 디렉토리를 설정할 수 있기 때문에 필요한 경우에만 출력 디렉토리를 결정할 수 있기 때문입니다. 다른 형식 (예 : key : Text.class, value : Text.class 및 key : Text.class 및 Value : IntWritable.class)을 사용하여 출력하려는 ​​경우 실제로는 둘 이상의 namedOutput 만 필요합니다.

    설정:

    MultipleOutputs.addNamedOutput(job, "Output", TextOutputFormat.class, Text.class, Text.class);
    

    감속기의 설치 :

    mout = new MultipleOutputs<Text, Text>(context);
    

    감속기에서 mout 호출 :

    String key; //set to whatever output key will be
    String value; //set to whatever output value will be
    String outputFileName; //set to absolute path to file where this should write
    
    mout.write("Output",new Text(key),new Text(value),outputFileName);
    

    당신은 코드를 작성하는 동안 디렉토리를 결정할 수 있습니다. 예를 들어 달 및 연도별로 디렉토리를 지정한다고 가정 해보십시오.

    int year;//extract year from data
    int month;//extract month from data
    String baseFileName; //parent directory to all outputs from this job
    String outputFileName = baseFileName + "/" + year + "/" + month;
    
    mout.write("Output",new Text(key),new Text(value),outputFileName);
    

    희망이 도움이됩니다.

    편집 : 출력 파일 구조 위의 예제 :

    Base
        2013
            01
            02
            03
            ...
        2012
            01
            ...
        ...
    
  2. ==============================

    2.아마 당신은 청소에있는 mos를 닫는 것을 놓쳤습니다.

    아마 당신은 청소에있는 mos를 닫는 것을 놓쳤습니다.

    아래와 같이 매퍼 또는 감속기에서 설정 한 경우 :

    public void setup(Context context) {mos = new MultipleOutputs(context);}
    

    당신은 아래처럼 정리 시작시에 모기를 닫아야합니다.

    public void cleanup(Context context ) throws IOException, InterruptedException {mos.close();}
    
  3. from https://stackoverflow.com/questions/19820985/hadoop-method-to-send-output-to-multiple-directories by cc-by-sa and MIT license