[HADOOP] 출력으로하여 MongoDB에서의 MapReduce를 사용 HDFS에서 처리 된 데이터를 저장하는 방법
HADOOP출력으로하여 MongoDB에서의 MapReduce를 사용 HDFS에서 처리 된 데이터를 저장하는 방법
I는 HDFS HDFS에 데이터 저장의 출력 데이터를 처리 된 MapReduce의 애플리케이션을
그러나, 지금은 HDFS로 저장하는 대신에 MongoDB의 출력 데이터를 저장해야
하나는 내가 그것을 수행하는 방법을 알려주세요?
감사합니다
매퍼 클래스
package com.mapReduce;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class FMapper extends Mapper<LongWritable, Text, Text, Text> {
private String pART;
private String actual;
private String fdate;
public void map(LongWritable ikey, Text ivalue, Context context) throws IOException, InterruptedException {
String tempString = ivalue.toString();
String[] data = tempString.split(",");
pART=data[1];
try{
fdate=convertyymmdd(data[0]);
/**IF ACTUAL IS LAST HEADER
* actual=data[2];
* */
actual=data[data.length-1];
context.write(new Text(pART), new Text(fdate+","+actual+","+dynamicVariables(data)));
}catch(ArrayIndexOutOfBoundsException ae){
System.err.println(ae.getMessage());
}
}
public static String convertyymmdd(String date){
String dateInString=null;
String data[] =date.split("/");
String month=data[0];
String day=data[1];
String year=data[2];
dateInString =year+"/"+month+"/"+day;
System.out.println(dateInString);
return dateInString;
}
public static String dynamicVariables(String[] data){
StringBuilder str=new StringBuilder();
boolean isfirst=true;
/** IF ACTUAL IS LAST HEADER
* for(int i=3;i<data.length;i++){ */
for(int i=2;i<data.length-1;i++){
if(isfirst){
str.append(data[i]);
isfirst=false;
}
else
str.append(","+data[i]);
}
return str.toString();
}
}
감속기의 클래스
package com.mapReduce;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import javax.faces.bean.ApplicationScoped;
import javax.faces.bean.ManagedBean;
import javax.faces.bean.ManagedProperty;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import com.ihub.bo.ForcastBO;
import com.ihub.service.ForcastService;
public class FReducer extends Reducer<Text, Text, Text, Text> {
private String pART;
private List<ForcastBO> list = null;
private List<List<String>> listOfList = null;
private List<String> vals = null;
private static List<ForcastBO> forcastBos=new ArrayList<ForcastBO>();
@Override
public void reduce(Text _key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
// TODO Auto-generated method stub
pART = _key.toString();
// process values
for (Text val : values) {
String tempString = val.toString();
String[] data = tempString.split(",");
ForcastBO fb=new ForcastBO();
fb.setPart(pART);
fb.setDate(data[0]);
fb.setActual(data[1]);
fb.setW0(data[2]);
fb.setW1(data[3]);
fb.setW2(data[4]);
fb.setW3(data[5]);
fb.setW4(data[6]);
fb.setW5(data[7]);
fb.setW6(data[8]);
fb.setW7(data[9]);
try {
list.add(fb);
} catch (Exception ae) {
System.out.println(ae.getStackTrace() + "****" + ae.getMessage() + "*****" + ae.getLocalizedMessage());
}
}
}
@Override
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKey()) {
listOfList = new ArrayList<List<String>>();
list=new ArrayList<ForcastBO>();
reduce(context.getCurrentKey(), context.getValues(), context);
files_WE(listOfList, list, context);
}
}finally {
cleanup(context);
}
}
public void files_WE(List<List<String>> listOfList, List<ForcastBO> list, Context context) {
Collections.sort(list);
try {
setData(listOfList, list);
Collections.sort(listOfList, new Comparator<List<String>>() {
@Override
public int compare(List<String> o1, List<String> o2) {
return o1.get(0).compareTo(o2.get(0));
}
});
for (int i = listOfList.size() - 1; i > -1; i--) {
List<String> list1 = listOfList.get(i);
int k = 1;
for (int j = 3; j < list1.size(); j++) {
try {
list1.set(j, listOfList.get(i - k).get(j));
} catch (Exception ex) {
list1.set(j, null);
}
k++;
}
}
} catch (Exception e) {
//e.getLocalizedMessage();
}
for(List<String> ls:listOfList){
System.out.println(ls.get(0));
ForcastBO forcastBO=new ForcastBO();
try{
forcastBO.setPart(ls.get(0));
forcastBO.setDate(ls.get(1));
forcastBO.setActual(ls.get(2));
forcastBO.setW0(ls.get(3));
forcastBO.setW1(ls.get(4));
forcastBO.setW2(ls.get(5));
forcastBO.setW3(ls.get(6));
forcastBO.setW4(ls.get(7));
forcastBO.setW5(ls.get(8));
forcastBO.setW6(ls.get(9));
forcastBO.setW7(ls.get(10));
forcastBos.add(forcastBO);
}catch(Exception e){
forcastBos.add(forcastBO);
}
try{
System.out.println(forcastBO);
//service.setForcastBOs(forcastBos);
}catch(Exception e){
System.out.println("FB::::"+e.getStackTrace());
}
}
}
public void setData(List<List<String>> listOfList, List<ForcastBO> list) {
List<List<String>> temListOfList=new ArrayList<List<String>>();
for (ForcastBO str : list) {
vals = new ArrayList<String>();
vals.add(str.getPart());
vals.add(str.getDate());
vals.add(str.getActual());
vals.add(str.getW0());
vals.add(str.getW1());
vals.add(str.getW2());
vals.add(str.getW3());
vals.add(str.getW4());
vals.add(str.getW5());
vals.add(str.getW6());
vals.add(str.getW7());
temListOfList.add(vals);
}
Collections.sort(temListOfList, new Comparator<List<String>>() {
@Override
public int compare(List<String> o1, List<String> o2) {
return o1.get(1).compareTo(o2.get(1));
}
});
for(List<String> ls:temListOfList){
System.out.println(ls);
listOfList.add(ls);
}
}
public static List<ForcastBO> getForcastBos() {
return forcastBos;
}
}
드라이버 클래스
package com.mapReduce;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MRDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "JobName");
job.setJarByClass(MRDriver.class);
// TODO: specify a mapper
job.setMapperClass(FMapper.class);
// TODO: specify a reducer
job.setReducerClass(FReducer.class);
// TODO: specify output types
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// TODO: delete temp file
FileSystem hdfs = FileSystem.get(new URI("hdfs://localhost:9000"),
conf);
Path workingDir=hdfs.getWorkingDirectory();
Path newFolderPath= new Path("/sd1");
newFolderPath=Path.mergePaths(workingDir, newFolderPath);
if(hdfs.exists(newFolderPath))
{
hdfs.delete(newFolderPath); //Delete existing Directory
}
// TODO: specify input and output DIRECTORIES (not files)
FileInputFormat.setInputPaths(job,new Path("hdfs://localhost:9000/Forcast/SampleData"));
FileOutputFormat.setOutputPath(job, newFolderPath);
if (!job.waitForCompletion(true))
return;
}
}
해결법
-
==============================
1.기본적으로 당신이 필요로하는 무엇 "출력 형식의 클래스"를 변경하는 것입니다, 당신은 거기에 몇 가지 방법이있다 :
기본적으로 당신이 필요로하는 무엇 "출력 형식의 클래스"를 변경하는 것입니다, 당신은 거기에 몇 가지 방법이있다 :
제 생각에는 옵션 1은 최선의 방법입니다하지만 난 충분히 안정적이고 작동하는지 말을 MongoDB의 커넥터를 사용하지 않았습니다. 옵션 2 당신이 정말로 열려있는 연결 및 거래와 하둡 작업 재시도 문제의 많은 끝을 방지하기 위해 하둡 언더 후드를 작동하는 방법을 이해하는 것이 필요합니다.
from https://stackoverflow.com/questions/32500080/how-to-store-processed-data-from-hdfs-using-mapreduce-in-mongodb-as-output by cc-by-sa and MIT license
'HADOOP' 카테고리의 다른 글
[HADOOP] 어떻게로드 명령을 사용하여 돼지의 폴더에 여러 개의 텍스트 파일을로드하려면? (0) | 2019.10.16 |
---|---|
[HADOOP] HBase를도 최대 버전을 반환에게 이전 값을 얻을 수 = 1 (0) | 2019.10.16 |
[HADOOP] 하이브 예외 java.lang.RuntimeException가 : java.lang.ClassCastException가 : (0) | 2019.10.16 |
[HADOOP] 하둡 메모리 java.io.IOException 상위를 할당 할 수 없습니다 : 오류 = 12 (0) | 2019.10.16 |
[HADOOP] 오류 : 데이터 노드를 찾지 못했습니다 클러스터 상태를 확인하는 것이 좋습니다 (0) | 2019.10.16 |