복붙노트

[HADOOP] hadoop에 Stackoverflow의 posts.xml 구문 분석

HADOOP

hadoop에 Stackoverflow의 posts.xml 구문 분석

나는이 기사를 codeproject의 Anoop Madhusudanan이 클러스터에 없지만 내 시스템에 추천 엔진을 작성하기 위해 사용하고있다.

문제는 내가 구조를 다음과 같습니다 posts.xml 구문 분석 할 때입니다 :

 <row Id="99" PostTypeId="2" ParentId="88" CreationDate="2008-08-01T14:55:08.477" Score="2" Body="&lt;blockquote&gt;&#xD;&#xA;  &lt;p&gt;The actual resolution of gettimeofday() depends on the hardware architecture. Intel processors as well as SPARC machines offer high resolution timers that measure microseconds. Other hardware architectures fall back to the system’s timer, which is typically set to 100 Hz. In such cases, the time resolution will be less accurate. &lt;/p&gt;&#xD;&#xA;&lt;/blockquote&gt;&#xD;&#xA;&#xD;&#xA;&lt;p&gt;I obtained this answer from &lt;a href=&quot;http://www.informit.com/guides/content.aspx?g=cplusplus&amp;amp;seqNum=272&quot; rel=&quot;nofollow&quot;&gt;High Resolution Time Measurement and Timers, Part I&lt;/a&gt;&lt;/p&gt;" OwnerUserId="25" LastActivityDate="2008-08-01T14:55:08.477" />

이제 hadoop에서이 파일 (크기 1.4GB)을 파싱 할 필요가있다. 자바 코드를 작성하고 jar 파일을 만들었다. Java 클래스는 다음과 같습니다.

import java.io.IOException;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.DocumentBuilder;
import org.w3c.dom.Document;
import org.w3c.dom.NodeList;
import org.w3c.dom.Node;
import org.w3c.dom.Element;

import java.io.File;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;


public class Recommend {

    static class Map extends Mapper<Text, Text, Text, Text> {
        Path path;
        String fXmlFile;
        DocumentBuilderFactory dbFactory;
        DocumentBuilder dBuilder;
        Document doc;

        /**
         * Given an output filename, write a bunch of random records to it.
         */
        public void map(LongWritable key, Text value,
                OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
            try{
                fXmlFile=value.toString();
                dbFactory = DocumentBuilderFactory.newInstance();
                dBuilder= dbFactory.newDocumentBuilder();
                doc= dBuilder.parse(fXmlFile);

                doc.getDocumentElement().normalize();
                NodeList nList = doc.getElementsByTagName("row");

                for (int temp = 0; temp < nList.getLength(); temp++) {

                    Node nNode = nList.item(temp);
                    Element eElement = (Element) nNode;

                    Text keyWords =new Text(eElement.getAttribute("OwnerUserId"));
                    Text valueWords = new Text(eElement.getAttribute("ParentId"));
                    String val=keyWords.toString()+" "+valueWords.toString();
                    // Write the sentence 
                    if(keyWords != null && valueWords != null){
                        output.collect(keyWords, new Text(val));
                    }
                }

            }catch (Exception e) {
                e.printStackTrace();
            } 
        }
    }

    /**
     * 
     * @throws IOException 
     */
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        //String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        /*if (args.length != 2) {
          System.err.println("Usage: wordcount <in> <out>");
          System.exit(2);
        }*/
//      FileSystem fs = FileSystem.get(conf);
        Job job = new Job(conf, "Recommend");
        job.setJarByClass(Recommend.class);

        // the keys are words (strings)
        job.setOutputKeyClass(Text.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(Text.class);

        // the values are counts (ints)
        job.setOutputValueClass(Text.class);

        job.setMapperClass(Map.class);
        //conf.setReducerClass(Reduce.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
         Path outPath = new Path(args[1]);
            FileSystem dfs = FileSystem.get(outPath.toUri(), conf);
            if (dfs.exists(outPath)) {
            dfs.delete(outPath, true);
            }
    }
}

Output이 OwnerUserId ParentId와 같은 출력을 포함하는 hadoop에있는 파일 인 것으로 기대합니다. 대신 출력은 다음과 같습니다.

1599788   <row Id="2292" PostTypeId="2" ParentId="2284" CreationDate="2008-08-05T13:28:06.700" Score="0" ViewCount="0" Body="&lt;p&gt;The first thing you should do is contact the main people who run the open source project. Ask them if it is ok to contribute to the code and go from there.&lt;/p&gt;&#xD;&#xA;&#xD;&#xA;&lt;p&gt;Simply writing your improved code and then giving it to them may result in your code being rejected.&lt;/p&gt;" OwnerUserId="383" LastActivityDate="2008-08-05T13:28:06.700" />

매퍼 (mapper)의 핵심 가치로 나타나는 1599788의 출처에 대해 알지 못합니다.

hadoop에 대한 mapper 클래스를 작성하는 것에 대해 많이 알지 못합니다. 원하는 결과를 얻으려면 코드를 수정하는 데 도움이 필요합니다.

미리 감사드립니다.

해결법

  1. ==============================

    1.많은 연구와 실험을 거친 끝에 필자가 제공 한 구문과 같은 parsin xml 파일에 대한 맵을 작성하는 방법을 배웠습니다. 나는 나의 접근법을 바꿨다. 그리고 이것은 나의 새로운 매퍼 코드 다 ... 나의 usecase를위한 그것의 일.

    많은 연구와 실험을 거친 끝에 필자가 제공 한 구문과 같은 parsin xml 파일에 대한 맵을 작성하는 방법을 배웠습니다. 나는 나의 접근법을 바꿨다. 그리고 이것은 나의 새로운 매퍼 코드 다 ... 나의 usecase를위한 그것의 일.

    누군가가 도움이되기를 희망하고 시간을 절약 할 수 있습니다 :)

    import java.io.IOException;
    import java.util.StringTokenizer;
    
    import javax.xml.parsers.ParserConfigurationException;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.xml.sax.SAXException;
    
    public class Map extends Mapper<LongWritable, Text, NullWritable, Text> {
        NullWritable obj;
    
        @Override
        public void map(LongWritable key, Text value, Context context) throws InterruptedException {
            StringTokenizer tok= new StringTokenizer(value.toString()); 
            String pa=null,ow=null,pi=null,v;
            while (tok.hasMoreTokens()) {
                String[] arr;
                String val = (String) tok.nextToken();
                if(val.contains("PostTypeId")){
                    arr= val.split("[\"]");
                    pi=arr[arr.length-1];
                    if(pi.equals("2")){
                        continue;
                    }
                    else break;
                }
                if(val.contains("ParentId")){
                    arr= val.split("[\"]");
                    pa=arr[arr.length-1];
                } 
                else if(val.contains("OwnerUserId") ){
                    arr= val.split("[\"]");
                    ow=arr[arr.length-1];
                    try {
                        if(pa!=null && ow != null){
                            v=String.format("{0},{1}", ow,pa);
                            context.write(obj,new Text(v));
    
                        }
                    } catch (IOException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            }
    
    
        }
    
    }
    
  2. ==============================

    2.여기에 XML을 파싱하고 다른 맵 축소 작업이나 Hive 또는 Pig에서 사용되는 hadoop에 탭으로 구분 된 파일을 작성하기 위해 작성한 매퍼가 있습니다.

    여기에 XML을 파싱하고 다른 맵 축소 작업이나 Hive 또는 Pig에서 사용되는 hadoop에 탭으로 구분 된 파일을 작성하기 위해 작성한 매퍼가 있습니다.

    매퍼

    package com.aravind.learning.hadoop.mapred.techtalks;
    
    import java.io.IOException;
    import java.io.StringReader;
    import java.text.DateFormat;
    import java.text.ParseException;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    import javax.xml.parsers.DocumentBuilder;
    import javax.xml.parsers.DocumentBuilderFactory;
    import javax.xml.parsers.ParserConfigurationException;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.w3c.dom.Document;
    import org.w3c.dom.Element;
    import org.xml.sax.InputSource;
    import org.xml.sax.SAXException;
    
    import com.google.common.base.Joiner;
    
    public class StackoverflowDataWranglerMapper extends Mapper<LongWritable, Text, Text, Text>
    {
        static enum BadRecordCounters
        {
            NO_CREATION_DATE, UNKNOWN_USER_ID, UNPARSEABLE_RECORD, UNTAGGED_POSTS
        }
    
        private final Text outputKey = new Text();
        private final Text outputValue = new Text();
    
        private final DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
        private DocumentBuilder builder;
        private static final Joiner TAG_JOINER = Joiner.on(",").skipNulls();
        // 2008-07-31T21:42:52.667
        private static final DateFormat DATE_PARSER = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
        private static final SimpleDateFormat DATE_BUILDER = new SimpleDateFormat("yyyy-MM-dd");
    
        @Override
        protected void setup(Context context) throws IOException, InterruptedException
        {
            try
            {
                builder = factory.newDocumentBuilder();
            }
            catch (ParserConfigurationException e)
            {
                new IOException(e);
            }
        }
    
        @Override
        protected void map(LongWritable inputKey, Text inputValue, Mapper<LongWritable, Text, Text, Text>.Context context)
                throws IOException, InterruptedException
        {
            try
            {
                String entry = inputValue.toString();
                if (entry.contains("<row "))
                {
                    Document doc = builder.parse(new InputSource(new StringReader(entry)));
                    Element rootElem = doc.getDocumentElement();
    
                    String id = rootElem.getAttribute("Id");
                    String postedBy = rootElem.getAttribute("OwnerUserId").trim();
                    String viewCount = rootElem.getAttribute("ViewCount");
                    String postTypeId = rootElem.getAttribute("PostTypeId");
                    String score = rootElem.getAttribute("Score");
                    String title = rootElem.getAttribute("Title");
                    String tags = rootElem.getAttribute("Tags");
                    String answerCount = rootElem.getAttribute("AnswerCount");
                    String commentCount = rootElem.getAttribute("CommentCount");
                    String favoriteCount = rootElem.getAttribute("FavoriteCount");
                    String creationDate = rootElem.getAttribute("CreationDate");
    
                    Date parsedDate = null;
                    if (creationDate != null && creationDate.trim().length() > 0)
                    {
                        try
                        {
                            parsedDate = DATE_PARSER.parse(creationDate);
                        }
                        catch (ParseException e)
                        {
                            context.getCounter("Bad Record Counters", "Posts missing CreationDate").increment(1);
                        }
                    }
    
                    if (postedBy.length() == 0 || postedBy.trim().equals("-1"))
                    {
                        context.getCounter("Bad Record Counters", "Posts with either empty UserId or UserId contains '-1'")
                                .increment(1);
                        try
                        {
                            parsedDate = DATE_BUILDER.parse("2100-00-01");
                        }
                        catch (ParseException e)
                        {
                            // ignore
                        }
                    }
    
                    tags = tags.trim();
                    String tagTokens[] = null;
    
                    if (tags.length() > 1)
                    {
                        tagTokens = tags.substring(1, tags.length() - 1).split("><");
                    }
                    else
                    {
                        context.getCounter("Bad Record Counters", "Untagged Posts").increment(1);
                    }
    
                    outputKey.clear();
                    outputKey.set(id);
    
                    StringBuilder sb = new StringBuilder(postedBy).append("\t").append(parsedDate.getTime()).append("\t")
                            .append(postTypeId).append("\t").append(title).append("\t").append(viewCount).append("\t").append(score)
                            .append("\t");
    
                    if (tagTokens != null)
                    {
                        sb.append(TAG_JOINER.join(tagTokens)).append("\t");
                    }
                    else
                    {
                        sb.append("").append("\t");
                    }
                    sb.append(answerCount).append("\t").append(commentCount).append("\t").append(favoriteCount).toString();
    
                    outputValue.set(sb.toString());
    
                    context.write(outputKey, outputValue);
                }
            }
            catch (SAXException e)
            {
                context.getCounter("Bad Record Counters", "Unparsable records").increment(1);
            }
            finally
            {
                builder.reset();
            }
        }
    }
    

    운전사

    public class StackoverflowDataWranglerDriver extends Configured implements Tool
    {
        @Override
        public int run(String[] args) throws Exception
        {
            if (args.length != 2)
            {
                System.err.printf("Usage: %s [generic options] <input> <output>\n", getClass().getSimpleName());
                ToolRunner.printGenericCommandUsage(System.err);
                return -1;
            }
    
            Job job = Job.getInstance(getConf());
    
            job.setJobName("Tech Talks - Stackoverflow Forum Posts - Data Wrangler");
    
            TextInputFormat.addInputPath(job, new Path(args[0]));
    
            TextOutputFormat.setOutputPath(job, new Path(args[1]));
    
            job.setInputFormatClass(TextInputFormat.class);
            job.setOutputFormatClass(TextOutputFormat.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
    
            job.setJarByClass(StackoverflowDataWranglerMapper.class);// required for mr1
            job.setMapperClass(StackoverflowDataWranglerMapper.class);
            job.setNumReduceTasks(0);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
    
            return job.waitForCompletion(true) ? 0 : 1;
        }
    
        public static void main(String args[]) throws Exception
        {
            int exitCode = ToolRunner.run(new Configuration(), new StackoverflowDataWranglerDriver(), args);
            System.exit(exitCode);
        }
    }
    

    작업 제출 명령

    hadoop jar ./hadoop-examples-0.0.1-SNAPSHOT.jar com.aravind.learning.hadoop.mapred.techtalks.StackoverflowDataWranglerDriver data/stackoverflow-posts.xml data/so-posts-tsv
    
  3. from https://stackoverflow.com/questions/19445528/parsing-of-stackoverflows-posts-xml-on-hadoop by cc-by-sa and MIT license