복붙노트

[HADOOP] 출력을 hdfs에 저장하지 않고 mapper-reducer의 출력을 다른 mapper-reducer로 직접 보내는 방법

HADOOP

출력을 hdfs에 저장하지 않고 mapper-reducer의 출력을 다른 mapper-reducer로 직접 보내는 방법

문제 해결 결국 내 솔루션을 확인하십시오

최근에 저는 Mahout in Action에서 chaper6 (listing 6.1 ~ 6.4)의 추천 예제를 실행하려고합니다. 하지만 문제가 발생하여 주위에 봤지만 솔루션을 찾을 수 없습니다.

문제는 다음과 같습니다. 한 쌍의 매퍼 감속기가 있습니다.

public final class WikipediaToItemPrefsMapper extends
    Mapper<LongWritable, Text, VarLongWritable, VarLongWritable> {

private static final Pattern NUMBERS = Pattern.compile("(\\d+)");

@Override
protected void map(LongWritable key, Text value, Context context)
        throws IOException, InterruptedException {
    String line = value.toString();
    Matcher m = NUMBERS.matcher(line);
    m.find();
    VarLongWritable userID = new VarLongWritable(Long.parseLong(m.group()));
    VarLongWritable itemID = new VarLongWritable();
    while (m.find()) {
        itemID.set(Long.parseLong(m.group()));
        context.write(userID, itemID);
    }
}
}

public class WikipediaToUserVectorReducer
    extends
    Reducer<VarLongWritable, VarLongWritable, VarLongWritable, VectorWritable> {

public void reduce(VarLongWritable userID,
        Iterable<VarLongWritable> itemPrefs, Context context)
        throws IOException, InterruptedException {
    Vector userVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
    for (VarLongWritable itemPref : itemPrefs) {
        userVector.set((int) itemPref.get(), 1.0f);
    }
    context.write(userID, new VectorWritable(userVector));
}
}

감속기는 userID와 userVector를 출력하고 다음과 같이 표시됩니다. 98955 {590 : 1.0 22 : 1.0 9059 : 1.0 3 : 1.0 2 : 1.0 1 : 1.0}

그런 다음이 데이터를 처리하기 위해 다른 쌍의 매퍼 감속기를 사용하려고합니다.

public class UserVectorSplitterMapper
    extends
    Mapper<VarLongWritable, VectorWritable, IntWritable, VectorOrPrefWritable> {

public void map(VarLongWritable key, VectorWritable value, Context context)
        throws IOException, InterruptedException {
    long userID = key.get();
    Vector userVector = value.get();
    Iterator<Vector.Element> it = userVector.iterateNonZero();
    IntWritable itemIndexWritable = new IntWritable();
    while (it.hasNext()) {
        Vector.Element e = it.next();
        int itemIndex = e.index();
        float preferenceValue = (float) e.get();
        itemIndexWritable.set(itemIndex);
        context.write(itemIndexWritable, 
                new VectorOrPrefWritable(userID, preferenceValue));
    }
}
}

작업을 실행하려고하면 오류가 발생합니다.

첫 번째 매퍼 - 감속기가 hdfs에 출력을 쓰고 두 번째 매퍼 - 감속기가 출력을 읽으려고하면 매퍼는 98955를 VarLongWritable로 캐스팅 할 수 있지만 변환 할 수는 없습니다 {590 : 1.0 22 : 1.0 9059 : 1.0 3 : 1.0 2 : 1.0 1 : 1.0} VectorWritable에, 그래서 첫 번째 매퍼 - 감속기 직접 출력을 두 번째 쌍으로 보낼 수있는 방법이 궁금하다. 데이터를 변환 할 필요가 없습니다. 나는 행동에서 하둡을 바라 보았다. 그리고 확실한 가이드, 그것을 할 수있는 그런 방법이없는 것 같다, 어떤 제안?

솔루션 : SequenceFileOutputFormat을 사용하여 DFS에서 첫 번째 MapReduce 워크 플로우의 축소 결과를 출력하고 저장할 수 있습니다. 두 번째 MapReduce 워크 플로는 매퍼를 만들 때 SequenceFileInputFormat 클래스를 매개 변수로 사용하여 임시 파일을 입력으로 읽을 수 있습니다. 벡터는 특정 형식을 가진 바이너리 시퀀스 파일에 저장되므로 SequenceFileInputFormat은이를 읽고이를 벡터 형식으로 다시 변환 할 수 있습니다.

다음은 몇 가지 예제 코드입니다.

confFactory ToItemPrefsWorkFlow = new confFactory
            (new Path("/dbout"), //input file path
             new Path("/mahout/output.txt"), //output file path
             TextInputFormat.class, //input format
             VarLongWritable.class, //mapper key format
             Item_Score_Writable.class, //mapper value format
             VarLongWritable.class, //reducer key format
             VectorWritable.class, //reducer value format
             **SequenceFileOutputFormat.class** //The reducer output format             

    );
    ToItemPrefsWorkFlow.setMapper( WikipediaToItemPrefsMapper.class);
    ToItemPrefsWorkFlow.setReducer(WikipediaToUserVectorReducer.class);
    JobConf conf1 = ToItemPrefsWorkFlow.getConf();


    confFactory UserVectorToCooccurrenceWorkFlow = new confFactory
            (new Path("/mahout/output.txt"),
             new Path("/mahout/UserVectorToCooccurrence"),
             SequenceFileInputFormat.class, //notice that the input format of mapper of the second work flow is now SequenceFileInputFormat.class
             //UserVectorToCooccurrenceMapper.class,
             IntWritable.class,
             IntWritable.class,
             IntWritable.class,
             VectorWritable.class,
             SequenceFileOutputFormat.class                                      
             );
     UserVectorToCooccurrenceWorkFlow.setMapper(UserVectorToCooccurrenceMapper.class);
     UserVectorToCooccurrenceWorkFlow.setReducer(UserVectorToCooccurrenceReducer.class);
    JobConf conf2 = UserVectorToCooccurrenceWorkFlow.getConf();

    JobClient.runJob(conf1);
    JobClient.runJob(conf2);

이 문제가 있으면 언제든지 저에게 연락하십시오.

해결법

  1. ==============================

    1.SequenceFileOutputFormat을 사용하고 출력 키 및 값 클래스를 정의하도록 첫 번째 작업의 출력을 명시 적으로 구성해야합니다.

    SequenceFileOutputFormat을 사용하고 출력 키 및 값 클래스를 정의하도록 첫 번째 작업의 출력을 명시 적으로 구성해야합니다.

    job.setOutputFormat(SequenceFileOutputFormat.class);
    job.setOutputKeyClass(VarLongWritable.class);
    job.setOutputKeyClass(VectorWritable.class);
    

    드라이버 코드를 보지 않고도 TextOutputFormat을 첫 번째 작업의 출력으로 사용하고 TextInputFormat을 두 번째 입력으로 사용한다고 추측합니다.이 입력 형식은 두 번째 매퍼에 쌍을 전송합니다

  2. ==============================

    2.나는 초보자이다. 대답에 대한 나의 추측 일 뿐이므로, 그것과 함께 참아주십시오. / 그것이 순진한 것처럼 보인다면 지적하십시오.

    나는 초보자이다. 대답에 대한 나의 추측 일 뿐이므로, 그것과 함께 참아주십시오. / 그것이 순진한 것처럼 보인다면 지적하십시오.

    HDFS를 절약하지 않고 감속기에서 다음 매퍼로 보내는 것이 타당하지 않다고 생각합니다. "어떤 데이터가 어떤 매퍼로 이동 하는가"는 지역성 기준을 충족하도록 우아하게 설계되었으므로 (로컬로 저장된 데이터가있는 매퍼 노드로 이동).

    HDFS에 저장하지 않으면 대부분의 데이터가 네트워크를 통해 전송되어 속도가 느려지고 대역폭 문제가 발생할 수 있습니다.

  3. ==============================

    3.두 번째 사람이 사용할 수 있도록 첫 번째 map-reduce의 출력을 임시로 저장해야합니다.

    두 번째 사람이 사용할 수 있도록 첫 번째 map-reduce의 출력을 임시로 저장해야합니다.

    이렇게하면 첫 번째 map-reduce의 결과가 두 번째 결과로 전달되는 방식을 이해하는 데 도움이됩니다. (이것은 Apache nutch의 Generator.java를 기반으로합니다).

    이것은 첫 번째 map-reduce의 출력을위한 임시 디렉토리입니다.

    Path tempDir =
      new Path(getConf().get("mapred.temp.dir", ".")
               + "/job1-temp-"
               + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
    

    첫 번째지도 - 축소 작업 설정 :

    JobConf job1 = getConf();
    job1.setJobName("job 1");
    FileInputFormat.addInputPath(...);
    sortJob.setMapperClass(...);
    
    FileOutputFormat.setOutputPath(job1, tempDir);
    job1.setOutputFormat(SequenceFileOutputFormat.class);
    job1.setOutputKeyClass(Text.class);
    job1.setOutputValueClass(...);
    JobClient.runJob(job1);
    

    작업 디렉토리에 출력 디렉토리가 설정되어 있는지 확인하십시오. 두 번째 작업에서 이것을 사용하십시오 :

    JobConf job2 = getConf();
    FileInputFormat.addInputPath(job2, tempDir);
    job2.setReducerClass(...);
    JobClient.runJob(job2);
    

    당신의 일이 끝나면 임시 방편을 정리하는 것을 잊지 마십시오 :

    // clean up
    FileSystem fs = FileSystem.get(getConf());
    fs.delete(tempDir, true);
    

    희망이 도움이됩니다.

  4. from https://stackoverflow.com/questions/10266367/how-to-directly-send-the-output-of-a-mapper-reducer-to-a-another-mapper-reducer by cc-by-sa and MIT license