복붙노트

[HADOOP] 지도 파일을 MapReduce 작업의 입력으로 사용

HADOOP

지도 파일을 MapReduce 작업의 입력으로 사용

최근에 Hadoop을 사용하기 시작했으며 Mapfile을 MapReduce 작업의 입력으로 사용하는 동안 문제가 있습니다.

다음 작업 코드는 hdfs에 "TestMap"이라는 간단한 MapFile을 씁니다. 여기에는 Text 유형의 세 키와 BytesWritable 유형의 세 가지 값이 있습니다.

TestMap의 내용은 다음과 같습니다.

$ hadoop fs  -text /user/hadoop/TestMap/data
11/01/20 11:17:58 INFO util.NativeCodeLoader: Loaded the native-hadoop library
11/01/20 11:17:58 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library
11/01/20 11:17:58 INFO compress.CodecPool: Got brand-new decompressor
A    01
B    02
C    03

다음은 TestMap Mapfile을 생성하는 프로그램입니다 :

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.IOUtils;

public class CreateMap {

    public static void main(String[] args) throws IOException{

        Configuration conf = new Configuration();
        FileSystem hdfs  = FileSystem.get(conf);

        Text key = new Text();
        BytesWritable value = new BytesWritable();
        byte[] data = {1, 2, 3};
        String[] strs = {"A", "B", "C"};
        int bytesRead;
        MapFile.Writer writer = null;

        writer = new MapFile.Writer(conf, hdfs, "TestMap", key.getClass(), value.getClass());
        try {
            for (int i = 0; i < 3; i++) {
                key.set(strs[i]);
                value.set(data, i, 1);
                writer.append(key, value);
                System.out.println(strs[i] + ":" + data[i] + " added.");
            }
        }
        catch (IOException e) {
            e.printStackTrace();
        }
        finally {
             IOUtils.closeStream(writer);
        }
    }
}

다음에 나오는 단순한 MapReduce 작업은 맵 파일의 값을 하나씩 증가 시키려고 시도합니다.

import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.io.BytesWritable;


public class AddOne extends Configured implements Tool {

    public static class MapClass extends MapReduceBase

        implements Mapper<Text, BytesWritable, Text, Text> {

        public void map(Text key, BytesWritable value,
                        OutputCollector<Text, Text> output,
                        Reporter reporter) throws IOException {


            byte[] data = value.getBytes();
            data[0] += 1;
            value.set(data, 0, 1);
            output.collect(key, new Text(value.toString()));
        }
    }

    public static class Reduce extends MapReduceBase
        implements Reducer<Text, Text, Text, Text> {

        public void reduce(Text key, Iterator<Text> values,
                           OutputCollector<Text, Text> output,
                           Reporter reporter) throws IOException {

            output.collect(key, values.next());
        }
    }

    public int run(String[] args) throws Exception {
        Configuration conf = getConf();

        JobConf job = new JobConf(conf, AddOne.class);

        Path in = new Path("TestMap");
        Path out = new Path("output");
        FileInputFormat.setInputPaths(job, in);
        FileOutputFormat.setOutputPath(job, out);

        job.setJobName("AddOne");
        job.setMapperClass(MapClass.class);
        job.setReducerClass(Reduce.class);

        job.setInputFormat(SequenceFileInputFormat.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        job.setOutputFormat(TextOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.set("key.value.separator.in.input.line", ":");


        JobClient.runJob(job);

        return 0;
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new AddOne(), args);

        System.exit(res);
    }
}

내가 얻는 런타임 예외는 다음과 같습니다.

java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.io.BytesWritable
    at AddOne$MapClass.map(AddOne.java:32)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:358)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:307)
    at org.apache.hadoop.mapred.Child.main(Child.java:170)

왜 hadoop이 LongWritable을 캐스팅하려고하는지 이해하지 못합니다. 왜냐하면 내 코드에서 Mapper 인터페이스를 올바르게 정의하기 때문입니다 (Mapper ).

누군가 나를 도울 수 있을까요?

대단히 감사합니다.

루카

해결법

  1. ==============================

    1.귀하의 문제는 이름에서 알 수 있듯이 MapFile은 파일이 아니기 때문에 발생합니다.

    귀하의 문제는 이름에서 알 수 있듯이 MapFile은 파일이 아니기 때문에 발생합니다.

    MapFile은 실제로 두 개의 파일로 구성된 디렉토리입니다. "데이터"파일이 있습니다.이 파일은 사용자가 작성한 키와 값을 포함하는 SequenceFile입니다. 그러나 "Index"파일도 있습니다.이 파일은 LongWritables와 같은 오프셋과 함께 키의 하위 시퀀스를 포함하는 다른 SequenceFile입니다. 이 인덱스는 MapFile.Reader에 의해 메모리에로드되어 랜덤 액세스 할 때 원하는 데이터가있는 데이터 파일의 오프셋을 신속하게 검색 할 수 있습니다.

    오래된 "org.apache.hadoop.mapred"버전의 SequenceFileInputFormat을 사용하고 있습니다. MapFile을 입력으로 보도록 지시 할 때만 데이터 파일을 보는 것이 현명하지 않습니다. 대신 실제로 데이터 파일과 색인 파일을 일반 입력 파일로 사용하려고 시도합니다. 데이터 파일은 클래스가 사용자가 지정한 것과 일치하기 때문에 올바르게 작동하지만 색인 파일 값은 모두 LongWritable이므로 색인 파일은 ClassCastException을 발생시킵니다.

    두 가지 옵션이 있습니다 : "org.apache.hadoop.mapreduce"버전의 SequenceFileInputFormat을 사용할 수 있습니다 (코드의 다른 부분을 변경 함). 데이터 파일을보기 위해 MapFiles에 대해 충분히 알고 있습니다. 대신 데이터 파일을 명시 적으로 입력 파일로 제공 할 수 있습니다.

  2. ==============================

    2.접근법 중 하나는 전체 MapFile 블록에 대해 하나의 레코드로 사용자 정의 InputFormat을 사용하고 map ()에서 키로 룩업 할 수 있습니다.

    접근법 중 하나는 전체 MapFile 블록에 대해 하나의 레코드로 사용자 정의 InputFormat을 사용하고 map ()에서 키로 룩업 할 수 있습니다.

  3. ==============================

    3.KeyValueTextInputFormat.class를 사용하여 동일한 문제를 해결했습니다.

    KeyValueTextInputFormat.class를 사용하여 동일한 문제를 해결했습니다.

    에서 전체 접근 방식을 언급했다.

    http://sanketraut.blogspot.in/2012/06/hadoop-example-setting-up-hadoop-on.html

  4. from https://stackoverflow.com/questions/4753470/mapfile-as-a-input-to-a-mapreduce-job by cc-by-sa and MIT license