복붙노트

[HADOOP] 압축 파일을 입력으로하여 hadoop을 실행 중입니다. hadoop이 읽은 데이터 입력 순서가 맞지 않습니다. 숫자 형식 예외

HADOOP

압축 파일을 입력으로하여 hadoop을 실행 중입니다. hadoop이 읽은 데이터 입력 순서가 맞지 않습니다. 숫자 형식 예외

mapred-site.xml에서 속성을 변경 한 후 tar.bz2 파일, .gz 및 tar.gz 파일을 입력으로 제공합니다. 위의 어느 것도 효과가 없었습니다. 내가 여기서 가정 한 것은 hadoop이 입력으로 읽은 레코드가 순서를 벗어난 것입니다. 입력의 한 열은 문자열이고 다른 열은 정수이지만 일부 시퀀스 데이터가 아니기 때문에 압축 파일에서 열을 읽는 동안 hadoop은 문자열 부분을 정수로 읽고 잘못된 형식 예외를 생성합니다. 나는 멍청한 놈입니다. 구성 또는 코드에 문제가 있는지 알고 싶습니다.

core-site.xml의 속성은 다음과 같습니다.

<property>
  <name>io.compression.codecs</name>
   <value>org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apac\
he.hadoop.io.compress.SnappyCodec</value>
   <description>A list of the compression codec classes that can be used for compression/decompression.</description>
</property>

mapred-site.xml의 속성은

<property>
  <name>mapred.compress.map.output</name>
  <value>true</value>
</property>

<property>
   <name>mapred.map.output.compression.codec</name>
   <value>org.apache.hadoop.io.compress.BZip2Codec</value>
</property>
<property>
<name>mapred.output.compression.type</name>
<value>BLOCK</value>
</property>

이것은 내 코드입니다

package org.myorg;

import java.io.IOException;
import java.util.*;
import org.apache.hadoop.util.NativeCodeLoader;        
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.io.compress.GzipCodec;        
import org.apache.hadoop.io.compress.*;
import org.apache.hadoop.io.compress.BZip2Codec;

public class MySort{
    public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable Marks = new IntWritable();
    private Text name = new Text();
        String one,two;
    int num;
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line);
        while (tokenizer.hasMoreTokens()) {
        one=tokenizer.nextToken();
        name.set(one);
        if(tokenizer.hasMoreTokens())
            two=tokenizer.nextToken();
        num=Integer.parseInt(two);
        Marks.set(num);
        context.write(name, Marks);
        }
    }
    } 

    public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {

    public void reduce(Text key, Iterable<IntWritable> values, Context context) 
        throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
        sum += val.get();
        }
        context.write(key, new IntWritable(sum));
    }
    }

    public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    //  conf.set("mapreduce.job.inputformat.class", "com.wizecommerce.utils.mapred.TextInputFormat");

    //  conf.set("mapreduce.job.outputformat.class", "com.wizecommerce.utils.mapred.TextOutputFormat");
    //  conf.setBoolean("mapreduce.map.output.compress",true);
    conf.setBoolean("mapred.output.compress",true);
    //conf.setBoolean("mapreduce.output.fileoutputformat.compress",false);
    //conf.setBoolean("mapreduce.map.output.compress",true);
    conf.set("mapred.output.compression.type", "BLOCK");     
    //conf.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);
    //      conf.setClass("mapred.map.output.compression.codec", GzipCodec.class, CompressionCodec.class);
    conf.setClass("mapred.map.output.compression.codec", BZip2Codec.class, CompressionCodec.class);
        Job job = new Job(conf, "mysort");
    job.setJarByClass(org.myorg.MySort.class);
    job.setJobName("mysort");
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);

    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);
    //  FileInputFormat.setCompressInput(job,true);
    FileOutputFormat.setCompressOutput(job, true);
    //FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
    //  conf.set("mapred.output.compression.type", CompressionType.BLOCK.toString()); 

    FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    job.waitForCompletion(true);
    }

}

이것들은 모두 makefile에 함께 넣어 진 명령들입니다

run:    all
        -sudo ./a.out
        sudo chmod 777 -R Data
        -sudo rm data.tar.bz2
        sudo tar -cvjf data.tar.bz2 Data/data.txt
        sudo javac -classpath /home/hduser/12115_Select_Query/hadoop-core-1.1.2.jar -d mysort MySort.java
        sudo jar -cvf mysort.jar -C mysort/ .
        -hadoop fs -rmr MySort/output
        -hadoop fs -rmr MySort/input
        hadoop fs -mkdir MySort/input
        hadoop fs -put data.tar.bz2 MySort/input
        hadoop jar mysort.jar org.myorg.MySort MySort/input/ MySort/output
        -sudo rm /home/hduser/Out/sort.txt
        hadoop fs -copyToLocal MySort/output/part-r-00000 /home/hduser/Out/sort.txt
        sudo gedit /home/hduser/Out/sort.txt

all:    rdata.c
        -sudo rm a.out
        -gcc rdata.c -o a.out

exec:   run

.PHONY: exec run

명령:

hadoop jar mysort.jar org.myorg.MySort MySort/input/ MySort/output

출력은 다음과 같습니다.

Java HotSpot(TM) 64-Bit Server VM warning: You have loaded library /usr/local/hadoop/lib/native/libhadoop.so.1.0.0 which might have disabled stack guard. The VM will try to fix the stack guard now.
It's highly recommended that you fix the library with 'execstack -c <libfile>', or link it with '-z noexecstack'.
14/06/25 11:20:28 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
14/06/25 11:20:28 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
14/06/25 11:20:29 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
14/06/25 11:20:29 INFO input.FileInputFormat: Total input paths to process : 1
14/06/25 11:20:29 INFO mapreduce.JobSubmitter: number of splits:1
14/06/25 11:20:29 INFO Configuration.deprecation: mapred.output.compress is deprecated. Instead, use mapreduce.output.fileoutputformat.compress
14/06/25 11:20:29 INFO Configuration.deprecation: mapred.map.output.compression.codec is deprecated. Instead, use mapreduce.map.output.compress.codec
14/06/25 11:20:29 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1403675322820_0001
14/06/25 11:20:30 INFO impl.YarnClientImpl: Submitted application application_1403675322820_0001
14/06/25 11:20:30 INFO mapreduce.Job: The url to track the job: http://localhost:8088/proxy/application_1403675322820_0001/
14/06/25 11:20:30 INFO mapreduce.Job: Running job: job_1403675322820_0001
14/06/25 11:20:52 INFO mapreduce.Job: Job job_1403675322820_0001 running in uber mode : false
14/06/25 11:20:52 INFO mapreduce.Job:  map 0% reduce 0%
14/06/25 11:21:10 INFO mapreduce.Job: Task Id : attempt_1403675322820_0001_m_000000_0, Status : FAILED
Error: java.lang.NumberFormatException: For input string: "0ustar"
    at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
    at java.lang.Integer.parseInt(Integer.java:580)
    at java.lang.Integer.parseInt(Integer.java:615)
    at org.myorg.MySort$Map.map(MySort.java:36)
    at org.myorg.MySort$Map.map(MySort.java:23)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163)

14/06/25 11:21:29 INFO mapreduce.Job: Task Id : attempt_1403675322820_0001_m_000000_1, Status : FAILED
Error: java.lang.NumberFormatException: For input string: "0ustar"
    at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
    at java.lang.Integer.parseInt(Integer.java:580)
    at java.lang.Integer.parseInt(Integer.java:615)
    at org.myorg.MySort$Map.map(MySort.java:36)
    at org.myorg.MySort$Map.map(MySort.java:23)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163)

14/06/25 11:21:49 INFO mapreduce.Job: Task Id : attempt_1403675322820_0001_m_000000_2, Status : FAILED
Error: java.lang.NumberFormatException: For input string: "0ustar"
    at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
    at java.lang.Integer.parseInt(Integer.java:580)
    at java.lang.Integer.parseInt(Integer.java:615)
    at org.myorg.MySort$Map.map(MySort.java:36)
    at org.myorg.MySort$Map.map(MySort.java:23)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163)

14/06/25 11:22:10 INFO mapreduce.Job:  map 100% reduce 100%
14/06/25 11:22:10 INFO mapreduce.Job: Job job_1403675322820_0001 failed with state FAILED due to: Task failed task_1403675322820_0001_m_000000
Job failed as tasks failed. failedMaps:1 failedReduces:0

14/06/25 11:22:10 INFO mapreduce.Job: Counters: 9
    Job Counters 
        Failed map tasks=4
        Launched map tasks=4
        Other local map tasks=3
        Data-local map tasks=1
        Total time spent by all maps in occupied slots (ms)=69797
        Total time spent by all reduces in occupied slots (ms)=0
        Total time spent by all map tasks (ms)=69797
        Total vcore-seconds taken by all map tasks=69797
        Total megabyte-seconds taken by all map tasks=71472128

나는 이것을 사용하려고 시도했다.

hadoop jar /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.3.0.jar   -Dmapred.output.compress=true   -Dmapred.compress.map.output=true   -Dmapred.output.compression.codec=org.apache.hadoop.io.compress.BZip2Codec   -Dmapred.reduce.tasks=0   -input MySort/input/data.txt   -output MySort/zip1

압축 파일 작성에 성공

hadoop fs -ls MySort/zip1

Found 3 items
-rw-r--r--   1 hduser supergroup          0 2014-06-25 10:43 MySort/zip1/_SUCCESS
-rw-r--r--   1 hduser supergroup   42488018 2014-06-25 10:43 MySort/zip1/part-00000.bz2
-rw-r--r--   1 hduser supergroup   42504084 2014-06-25 10:43 MySort/zip1/part-00001.bz2

그리고 이것을 실행 :

hadoop jar mysort.jar org.myorg.MySort MySort/input/ MySort/zip1

여전히 작동하지 않습니다. 내가 여기서 놓친 것이 있습니까?

압축 파일 bz2를 사용하지 않고 텍스트 파일 Data / data.txt를 직접 전달합니다. 즉 hdfs의 MySort / input에 업로드합니다 (hadoop fs -put Data / data.txt MySort / input).

도움을 주셔서 감사합니다

해결법

  1. ==============================

    1.나는 이것을 해결했다. 툴 러너를 사용했습니다.

    나는 이것을 해결했다. 툴 러너를 사용했습니다.

    package org.myorg;
    
    import java.io.IOException;
    import java.util.*;
    import org.apache.hadoop.util.NativeCodeLoader;        
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.conf.*;
    import org.apache.hadoop.io.compress.CompressionCodec;
    import org.apache.hadoop.io.compress.CompressionInputStream;
    import org.apache.hadoop.io.compress.CompressionOutputStream;
    import org.apache.hadoop.io.compress.Decompressor;
    import org.apache.hadoop.io.*;
    import org.apache.hadoop.mapreduce.*;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.hadoop.io.compress.GzipCodec;        
    import org.apache.hadoop.io.compress.*;
    import org.apache.hadoop.io.compress.BZip2Codec;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    public class ToolMapReduce extends Configured implements Tool 
    {
    
    
        public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> 
        {
            private final static IntWritable Marks = new IntWritable();
            private Text name = new Text();
            String one,two;
            int num;
            public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException 
            {
                String line = value.toString();
                StringTokenizer tokenizer = new StringTokenizer(line);
                while (tokenizer.hasMoreTokens()) 
                {
                one=tokenizer.nextToken();
                name.set(one);
                if(tokenizer.hasMoreTokens())
                    two=tokenizer.nextToken();
                num=Integer.parseInt(two);
                Marks.set(num);
                context.write(name, Marks);
                }
            }
        } 
    
        public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> 
        {
    
            public void reduce(Text key, Iterable<IntWritable> values, Context context) 
            throws IOException, InterruptedException 
            {
                int sum = 0;
                for (IntWritable val : values) 
                {
                sum += val.get();
                }
                context.write(key, new IntWritable(sum));
            }
        }
    
        public static void main(String[] args) throws Exception  
        {
            int res = ToolRunner.run(new Configuration(), new ToolMapReduce(), args);
            System.exit(res);
        }
    
        public int run(String[] args) throws Exception
        {   
    
            Configuration conf = this.getConf();
            //Configuration conf = new Configuration();
            //conf.setOutputFormat(SequenceFileOutputFormat.class); 
            //SequenceFileOutputFormat.setOutputCompressionType(conf, CompressionType.BLOCK); 
            //SequenceFileOutputFormat.setCompressOutput(conf, true); 
            //conf.set("mapred.output.compress","true");
            //  conf.set("mapred.output.compression","org.apache.hadoop.io.compress.SnappyCodec");
    
            //conf.set("mapred.output.compression.codec","org.apache.hadoop.io.compress.SnappyCodec");
            //  conf.set("mapreduce.job.inputformat.class", "com.wizecommerce.utils.mapred.TextInputFormat");
    
            //  conf.set("mapreduce.job.outputformat.class", "com.wizecommerce.utils.mapred.TextOutputFormat");
            //  conf.setBoolean("mapreduce.map.output.compress",true);
            conf.setBoolean("mapred.output.compress",true);
            //conf.setBoolean("mapreduce.output.fileoutputformat.compress",false);
            //conf.setBoolean("mapreduce.map.output.compress",true);
            conf.set("mapred.output.compression.type", "BLOCK");     
            //conf.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);
            //      conf.setClass("mapred.map.output.compression.codec", GzipCodec.class, CompressionCodec.class);
            conf.setClass("mapred.map.output.compression.codec", GzipCodec.class, CompressionCodec.class);
            Job job = new Job(conf, "mysort");
            job.setJarByClass(org.myorg.ToolMapReduce.class);
            //job.setJarByClass(org.myorg.MySort.class);
            job.setJobName("mysort");
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
    
            job.setMapperClass(Map.class);
            job.setReducerClass(Reduce.class);
    
            job.setInputFormatClass(TextInputFormat.class);
            job.setOutputFormatClass(TextOutputFormat.class);
            //  FileInputFormat.setCompressInput(job,true);
            FileOutputFormat.setCompressOutput(job, true);
            //FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
            //  conf.set("mapred.output.compression.type", CompressionType.BLOCK.toString()); 
    
            FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            return job.waitForCompletion(true) ? 0 : 1;
            //job.waitForCompletion(true);
        }
    
    
    }
    
  2. from https://stackoverflow.com/questions/24401674/running-hadoop-with-compressed-files-as-input-data-input-read-by-hadoop-not-in by cc-by-sa and MIT license