복붙노트

[HADOOP] 출력으로하여 MongoDB에서의 MapReduce를 사용 HDFS에서 처리 된 데이터를 저장하는 방법

HADOOP

출력으로하여 MongoDB에서의 MapReduce를 사용 HDFS에서 처리 된 데이터를 저장하는 방법

I는 HDFS HDFS에 데이터 저장의 출력 데이터를 처리 된 MapReduce의 애플리케이션을

그러나, 지금은 HDFS로 저장하는 대신에 MongoDB의 출력 데이터를 저장해야

하나는 내가 그것을 수행하는 방법을 알려주세요?

감사합니다

매퍼 클래스

package com.mapReduce;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class FMapper extends Mapper<LongWritable, Text, Text, Text> {
    private String pART;
    private String actual;
    private String fdate;
    public void map(LongWritable ikey, Text ivalue, Context context) throws IOException, InterruptedException {
        String tempString = ivalue.toString();
        String[] data = tempString.split(",");
        pART=data[1];
        try{
            fdate=convertyymmdd(data[0]);
            /**IF ACTUAL IS LAST HEADER
             * actual=data[2];
             * */
            actual=data[data.length-1];
            context.write(new Text(pART), new Text(fdate+","+actual+","+dynamicVariables(data)));
        }catch(ArrayIndexOutOfBoundsException ae){
            System.err.println(ae.getMessage());
        }

    }


    public static String convertyymmdd(String date){

        String dateInString=null;
        String data[] =date.split("/");
        String month=data[0];
        String day=data[1];
        String year=data[2];
        dateInString =year+"/"+month+"/"+day;
        System.out.println(dateInString);   
        return dateInString;
    }

    public static String dynamicVariables(String[] data){
        StringBuilder str=new StringBuilder();
        boolean isfirst=true; 
    /** IF ACTUAL IS LAST HEADER
     * for(int i=3;i<data.length;i++){ */
        for(int i=2;i<data.length-1;i++){

            if(isfirst){
                str.append(data[i]);
                isfirst=false;
            }
            else
            str.append(","+data[i]);
        }
        return str.toString();
        }

}

감속기의 클래스

package com.mapReduce;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;

import javax.faces.bean.ApplicationScoped;
import javax.faces.bean.ManagedBean;
import javax.faces.bean.ManagedProperty;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import com.ihub.bo.ForcastBO;
import com.ihub.service.ForcastService;
public class FReducer extends Reducer<Text, Text, Text, Text> {
    private String pART;
    private List<ForcastBO> list = null;
    private List<List<String>> listOfList = null;
    private List<String> vals = null;
    private static List<ForcastBO> forcastBos=new ArrayList<ForcastBO>();

    @Override
    public void reduce(Text _key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
    // TODO Auto-generated method stub
        pART = _key.toString();
        // process values
        for (Text val : values) {
            String tempString = val.toString();
            String[] data = tempString.split(",");
            ForcastBO fb=new ForcastBO();
            fb.setPart(pART);
            fb.setDate(data[0]);
            fb.setActual(data[1]);
            fb.setW0(data[2]);
            fb.setW1(data[3]);
            fb.setW2(data[4]);
            fb.setW3(data[5]);
            fb.setW4(data[6]);
            fb.setW5(data[7]);
            fb.setW6(data[8]);
            fb.setW7(data[9]);
            try {
                list.add(fb);
            } catch (Exception ae) {
                System.out.println(ae.getStackTrace() + "****" + ae.getMessage() + "*****" + ae.getLocalizedMessage());
            }
        }   
    }

    @Override
    public void run(Context context) throws IOException, InterruptedException {
        setup(context);
        try {
          while (context.nextKey()) {

         listOfList = new ArrayList<List<String>>();
         list=new ArrayList<ForcastBO>();
            reduce(context.getCurrentKey(), context.getValues(), context);
            files_WE(listOfList, list, context);

          }

          }finally {
              cleanup(context);
            }
    }


    public void files_WE(List<List<String>> listOfList, List<ForcastBO> list, Context context) {

        Collections.sort(list);

            try {
                setData(listOfList, list);

                Collections.sort(listOfList, new Comparator<List<String>>() {
                    @Override
                    public int compare(List<String> o1, List<String> o2) {
                        return o1.get(0).compareTo(o2.get(0));
                    }
                });

                for (int i = listOfList.size() - 1; i > -1; i--) {
                    List<String> list1 = listOfList.get(i);
                    int k = 1;
                    for (int j = 3; j < list1.size(); j++) {
                        try {
                            list1.set(j, listOfList.get(i - k).get(j));
                        } catch (Exception ex) {
                            list1.set(j, null);
                        }
                        k++;
                    }

                }
            } catch (Exception e) {
                //e.getLocalizedMessage();
            }

            for(List<String> ls:listOfList){
                System.out.println(ls.get(0));
                ForcastBO forcastBO=new ForcastBO();
                try{
                    forcastBO.setPart(ls.get(0));
                    forcastBO.setDate(ls.get(1));
                    forcastBO.setActual(ls.get(2));
                    forcastBO.setW0(ls.get(3));
                    forcastBO.setW1(ls.get(4));
                    forcastBO.setW2(ls.get(5));
                    forcastBO.setW3(ls.get(6));
                    forcastBO.setW4(ls.get(7));
                    forcastBO.setW5(ls.get(8));
                    forcastBO.setW6(ls.get(9));
                    forcastBO.setW7(ls.get(10));
                    forcastBos.add(forcastBO);
                    }catch(Exception e){
                        forcastBos.add(forcastBO);
                    }
                try{
                    System.out.println(forcastBO);
                    //service.setForcastBOs(forcastBos);
            }catch(Exception e){
                System.out.println("FB::::"+e.getStackTrace());
            }
            }
    }





        public void setData(List<List<String>> listOfList, List<ForcastBO> list) {
            List<List<String>> temListOfList=new ArrayList<List<String>>();
            for (ForcastBO str : list) {
                vals = new ArrayList<String>();
                vals.add(str.getPart());
                vals.add(str.getDate());
                vals.add(str.getActual());
                vals.add(str.getW0());
                vals.add(str.getW1());
                vals.add(str.getW2());
                vals.add(str.getW3());
                vals.add(str.getW4());
                vals.add(str.getW5());
                vals.add(str.getW6());
                vals.add(str.getW7());
                temListOfList.add(vals);
            }


            Collections.sort(temListOfList, new Comparator<List<String>>() {
                @Override
                public int compare(List<String> o1, List<String> o2) {
                    return o1.get(1).compareTo(o2.get(1));
                }
            });

            for(List<String> ls:temListOfList){
                System.out.println(ls);
                listOfList.add(ls);
                }
        }

        public static List<ForcastBO> getForcastBos() {
            return forcastBos;
        }



    }

드라이버 클래스

package com.mapReduce;

import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class MRDriver {

    public static void main(String[] args)  throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "JobName");
        job.setJarByClass(MRDriver.class);
        // TODO: specify a mapper
        job.setMapperClass(FMapper.class);
        // TODO: specify a reducer
        job.setReducerClass(FReducer.class);

        // TODO: specify output types
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        // TODO: delete temp file
        FileSystem hdfs = FileSystem.get(new URI("hdfs://localhost:9000"),
                conf); 
        Path workingDir=hdfs.getWorkingDirectory();

        Path newFolderPath= new Path("/sd1");
        newFolderPath=Path.mergePaths(workingDir, newFolderPath);
        if(hdfs.exists(newFolderPath))

        {
            hdfs.delete(newFolderPath); //Delete existing Directory

        }
        // TODO: specify input and output DIRECTORIES (not files)

        FileInputFormat.setInputPaths(job,new Path("hdfs://localhost:9000/Forcast/SampleData"));
        FileOutputFormat.setOutputPath(job, newFolderPath);

        if (!job.waitForCompletion(true))
            return;
    }
}

해결법

  1. ==============================

    1.기본적으로 당신이 필요로하는 무엇 "출력 형식의 클래스"를 변경하는 것입니다, 당신은 거기에 몇 가지 방법이있다 :

    기본적으로 당신이 필요로하는 무엇 "출력 형식의 클래스"를 변경하는 것입니다, 당신은 거기에 몇 가지 방법이있다 :

    제 생각에는 옵션 1은 최선의 방법입니다하지만 난 충분히 안정적이고 작동하는지 말을 MongoDB의 커넥터를 사용하지 않았습니다. 옵션 2 당신이 정말로 열려있는 연결 및 거래와 하둡 작업 재시도 문제의 많은 끝을 방지하기 위해 하둡 언더 후드를 작동하는 방법을 이해하는 것이 필요합니다.

  2. from https://stackoverflow.com/questions/32500080/how-to-store-processed-data-from-hdfs-using-mapreduce-in-mongodb-as-output by cc-by-sa and MIT license