복붙노트

[HADOOP] 하둡 맵리 듀스에서 XML 파싱

HADOOP

하둡 맵리 듀스에서 XML 파싱

나는 CSV로 XML을 구문 분석을위한 맵리 듀스 코드를 작성했습니다. 그러나 나는 그 일을 실행 한 후 내 출력 디렉터리에 출력을 찾을 수 없습니다. 나는 파일을 읽을 수 없습니다 또는 서면되지 않은 경우 확실하지 않다. 나는 하둡 맵리 듀스에 새로운입니다.

당신이 도와 주실 래요?

이 내 전체 코드.

public class XmlParser11
{
        public static String outvalue;
        public static class XmlInputFormat1 extends TextInputFormat {
        public static final String START_TAG_KEY = "xmlinput.start";
        public static final String END_TAG_KEY = "xmlinput.end";

        public RecordReader<LongWritable, Text> createRecordReader(
                InputSplit split, TaskAttemptContext context) {
            return new XmlRecordReader();
        }
        public static class XmlRecordReader extends
                RecordReader<LongWritable, Text> {
            private byte[] startTag;
            private byte[] endTag;
            private long start;
            private long end;
            private FSDataInputStream fsin;
            private DataOutputBuffer buffer = new DataOutputBuffer();

            private LongWritable key = new LongWritable();
            private Text value = new Text();
                @Override
            public void initialize(InputSplit split, TaskAttemptContext context)
                    throws IOException, InterruptedException {
                    System.out.println("B");
                Configuration conf = context.getConfiguration();
                startTag = conf.get(START_TAG_KEY).getBytes("utf-8");
                endTag = conf.get(END_TAG_KEY).getBytes("utf-8");
                FileSplit fileSplit = (FileSplit) split;

                // open the file and seek to the start of the split
                start = fileSplit.getStart();
                end = start + fileSplit.getLength();
                Path file = fileSplit.getPath();
                FileSystem fs = file.getFileSystem(conf);
                fsin = fs.open(fileSplit.getPath());
                fsin.seek(start);

            }
        @Override
            public boolean nextKeyValue() throws IOException,
                    InterruptedException {
            System.out.println("C");
                if (fsin.getPos() < end) {
                    if (readUntilMatch(startTag, false)) {
                        try {
                            buffer.write(startTag);
                            if (readUntilMatch(endTag, true)) {
                                key.set(fsin.getPos());
                                value.set(buffer.getData(), 0,
                                        buffer.getLength());
                                return true;
                            }
                        } finally {
                            buffer.reset();
                        }
                    }
                }
                return false;
            }
        @Override
           public LongWritable getCurrentKey() throws IOException,
                    InterruptedException {
                return key;
            }

        @Override
            public Text getCurrentValue() throws IOException,
                    InterruptedException {

                return value;
            }
        @Override
            public void close() throws IOException {
                fsin.close();
            }
        @Override
            public float getProgress() throws IOException {

                return (fsin.getPos() - start) / (float) (end - start);
            }

            private boolean readUntilMatch(byte[] match, boolean withinBlock)
                    throws IOException {
                int i = 0;

                while (true) {
                    int b = fsin.read();
                    // end of file:
                    if (b == -1)
                        return false;
                    // save to buffer:
                    if (withinBlock)
                        buffer.write(b);
                    // check if we're matching:
                    if (b == match[i]) {
                        i++;
                        if (i >= match.length)
                            return true;
                    } else
                        i = 0;
                    // see if we've passed the stop point:
                    if (!withinBlock && i == 0 && fsin.getPos() >= end)
                        return false;
                }
            }
        }
    }


        public static class Map extends Mapper<Text, Text,
        Text, Text> {
            @SuppressWarnings("unchecked")
            @Override
            protected void map(Text key, Text value,
                     @SuppressWarnings("rawtypes") Mapper.Context context)
                             throws
                             IOException, InterruptedException {

                String document = value.toString();
                System.out.println("‘" + document + "‘");

                XMLInputFactory xmlif = XMLInputFactory.newInstance();
                XMLStreamReader xmlr;

            try {
                xmlr = xmlif.createXMLStreamReader(new FileReader(document));
                while(xmlr.hasNext())
                {
                   printEvent(xmlr);
                   xmlr.next();
                 }
                   xmlr.close();
                   context.write(null,new Text (outvalue));
            } catch (XMLStreamException e) {

                e.printStackTrace();
            }
            }
                   private void printEvent(XMLStreamReader xmlr) {

                       switch (xmlr.getEventType()) {

                       case XMLStreamConstants.START_ELEMENT:
                          print(xmlr);
                           break;

                       case XMLStreamConstants.CHARACTERS:
                           int start = xmlr.getTextStart();
                           int length = xmlr.getTextLength();
                           System.out.print(new String(xmlr.getTextCharacters(),
                                      start,
                                      length));
                           break;
                       }
                   }
                   private  String print(XMLStreamReader xmlr){
                        if(xmlr.hasName()){
                          for (int i=0; i < xmlr.getAttributeCount(); i++) {
                              String localName = xmlr.getLocalName();
                              if (localName != null);
                              String attName = xmlr.getAttributeLocalName(i);
                                String value = xmlr.getAttributeValue(i);
                                System.out.print(",");
                                String outvalue = localName +":"+ attName +"-"+value;
                                System.out.print(outvalue);
                          }
                        } return outvalue;
                      }

  }
        public static void main(String[] args) throws Exception
        {
                Configuration conf = new Configuration();

                conf.set("xmlinput.start", "<FICHER>");
                conf.set("xmlinput.end", "</FICHER>");
                Job job = new Job(conf);
                job.setJarByClass(XmlParser11.class);
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(Text.class);

                job.setMapperClass(XmlParser11.Map.class);
                job.setNumReduceTasks(0);

                job.setInputFormatClass(XmlInputFormat1.class);
                job.setOutputFormatClass(TextOutputFormat.class);
                FileInputFormat.addInputPath(job, new Path(args[0]));
                FileOutputFormat.setOutputPath(job, new Path(args[1]));

                job.waitForCompletion(true);
        }

여기에 아웃 퍼티의 투입

File System Counters 
        FILE: Number of bytes read=0 strong text>                 
        FILE: Number of bytes written=120678
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=1762671
        HDFS: Number of bytes written=0
        HDFS: Number of read operations=5
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=2
Job Counters
        Launched map tasks=1
        Rack-local map tasks=1
        Total time spent by all maps in occupied slots (ms)=15960
        Total time spent by all reduces in occupied slots (ms)=0
        Total time spent by all map tasks (ms)=3990
        Total vcore-seconds taken by all map tasks=3990
        Total megabyte-seconds taken by all map tasks=16343040
Map-Reduce Framework
        Map input records=0
        Map output records=0
        Input split bytes=124
        Spilled Records=0
        Failed Shuffles=0
        Merged Map outputs=0
        GC time elapsed (ms)=0
        CPU time spent (ms)=1390
        Physical memory (bytes) snapshot=513449984
        Virtual memory (bytes) snapshot=4122763264
        Total committed heap usage (bytes)=2058354688
File Input Format Counters
        Bytes Read=1762547
File Output Format Counters
        Bytes Written=0

해결법

  1. ==============================

    1.나는 문제가 태그를 시작으로 생각합니다.

    나는 문제가 태그를 시작으로 생각합니다.

     conf.set("xmlinput.start", "<FICHER");`
     conf.set("xmlinput.end", "</FICHER>");
    

    이 당신을 도움이되기를 바랍니다.

  2. from https://stackoverflow.com/questions/32608540/xml-parsing-in-hadoop-mapreduce by cc-by-sa and MIT license