[HADOOP] 작업 시도가 600 초 동안 상태를보고하지 못해 감소가 실패합니다. 죽이는! 해결책?
HADOOP작업 시도가 600 초 동안 상태를보고하지 못해 감소가 실패합니다. 죽이는! 해결책?
작업 감소 단계가 실패합니다.
각 작업이 실패하는 이유는 다음과 같습니다.
시도 attempt_201301251556_1637_r_000005_0이 (가) 600 초 동안 상태를보고하지 못했습니다. 죽이는!
문제의 세부 사항 :
맵 단계는 시간, 제거, 데이터 형식의 각 레코드를 가져옵니다.
데이터 형식은 데이터 요소와 그 수입니다.
예 : a, 1b, 4c, 7은 레코드의 데이터에 해당합니다.
매퍼는 모든 데이터 요소에 대해 모든 레코드의 데이터를 출력합니다. 예 :
키 : (시간, a,), val : (제거, 데이터) 키 : (시간, b,), val : (제거, 데이터) 키 : (시간, c,), val : (제거, 데이터)
모든 감소는 모든 레코드에서 동일한 키에 해당하는 모든 데이터를받습니다. 예 : 키 : (시간, a), 값 : (rid1, 데이터) 및 키 : (시간, a), 값 : (rid2, 데이터) 동일한 축소 인스턴스에 도달합니다.
여기에서 일부 처리를 수행하고 유사한 뚜껑을 출력합니다.
내 프로그램은 10MB와 같은 작은 데이터 세트에서도 문제없이 실행됩니다. 그러나 위의 이유와 함께 데이터가 1G로 증가하면 실패합니다. 나는 이것이 왜 일어나는 지 모른다. 도와주세요!
코드 줄이기 :
아래에 두 개의 클래스가 있습니다.
가. VCLReduce0SPlit
public class VCLReduce0Split extends MapReduceBase implements Reducer<Text, Text, Text, Text>{
// @SuppressWarnings("unchecked")
public void reduce (Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
String key_str = key.toString();
StringTokenizer stk = new StringTokenizer(key_str);
String t = stk.nextToken();
HashMap<String, String> hmap = new HashMap<String, String>();
while(values.hasNext())
{
StringBuffer sbuf1 = new StringBuffer();
String val = values.next().toString();
StringTokenizer st = new StringTokenizer(val);
String uid = st.nextToken();
String data = st.nextToken();
int total_size = 0;
StringTokenizer stx = new StringTokenizer(data,"|");
StringBuffer sbuf = new StringBuffer();
while(stx.hasMoreTokens())
{
String data_part = stx.nextToken();
String data_freq = stx.nextToken();
// System.out.println("data_part:----->"+data_part+" data_freq:----->"+data_freq);
sbuf.append(data_part);
sbuf.append("|");
sbuf.append(data_freq);
sbuf.append("|");
}
/*
for(int i = 0; i<parts.length-1; i++)
{
System.out.println("data:--------------->"+data);
int part_size = Integer.parseInt(parts[i+1]);
sbuf.append(parts[i]);
sbuf.append("|");
sbuf.append(part_size);
sbuf.append("|");
total_size = part_size+total_size;
i++;
}*/
sbuf1.append(String.valueOf(total_size));
sbuf1.append(",");
sbuf1.append(sbuf);
if(uid.equals("203664471")){
// System.out.println("data:--------------------------->"+data+" tot_size:---->"+total_size+" sbuf:------->"+sbuf);
}
hmap.put(uid, sbuf1.toString());
}
float threshold = (float)0.8;
CoreSplit obj = new CoreSplit();
ArrayList<CustomMapSimilarity> al = obj.similarityCalculation(t, hmap, threshold);
for(int i = 0; i<al.size(); i++)
{
CustomMapSimilarity cmaps = al.get(i);
String xy_pair = cmaps.getRIDPair();
String similarity = cmaps.getSimilarity();
output.collect(new Text(xy_pair), new Text(similarity));
}
}
}
b. coreSplit
package com.a;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.StringTokenizer;
import java.util.TreeMap;
import org.apache.commons.collections.map.MultiValueMap;
public class PPJoinPlusCoreOptNewSplit{
public ArrayList<CustomMapSimilarity> similarityCalculation(String time, HashMap<String,String>hmap, float t)
{
ArrayList<CustomMapSimilarity> als = new ArrayList<CustomMapSimilarity>();
ArrayList<CustomMapSimilarity> alsim = new ArrayList<CustomMapSimilarity>();
Iterator<String> iter = hmap.keySet().iterator();
MultiValueMap index = new MultiValueMap();
String RID;
TreeMap<String, Integer> hmap2;
Iterator<String> iter1;
int size;
float prefix_size;
HashMap<String, Float> alpha;
HashMap<String, CustomMapOverlap> hmap_overlap;
String data;
while(iter.hasNext())
{
RID = (String)iter.next();
String data_val = hmap.get(RID);
StringTokenizer st = new StringTokenizer(data_val,",");
// System.out.println("data_val:--**********-->"+data_val+" RID:------------>"+RID+" time::---?"+time);
String RIDsize = st.nextToken();
size = Integer.parseInt(RIDsize);
data = st.nextToken();
StringTokenizer st1 = new StringTokenizer(data,"\\|");
String[] parts = data.split("\\|");
// hmap2 = (TreeMap<String, Integer>)hmap.get(RID);
// iter1 = hmap2.keySet().iterator();
// size = hmap_size.get(RID);
prefix_size = (float)(size-(0.8*size)+1);
if(size==1)
{
prefix_size = 1;
}
alpha = new HashMap<String, Float>();
hmap_overlap = new HashMap<String, CustomMapOverlap>();
// Iterator<String> iter2 = hmap2.keySet().iterator();
int prefix_index = 0;
int pi=0;
for(float j = 0; j<=prefix_size; j++)
{
boolean prefix_chk = false;
prefix_index++;
String ptoken = parts[pi];
// System.out.println("data:---->"+data+" ptoken:---->"+ptoken);
float val = Float.parseFloat(parts[pi+1]);
float temp_j = j;
j = j+val;
boolean j_l = false ;
float prefix_contri = 0;
pi= pi+2;
if(j>prefix_size)
{
// prefix_contri = j-temp_j;
prefix_contri = prefix_size-temp_j;
if(prefix_contri>0)
{
j_l = true;
prefix_chk = false;
}
else
{
prefix_chk = true;
}
}
if(prefix_chk == false){
filters(index, ptoken, RID, hmap,t, size, val, j_l, alpha, hmap_overlap, j, prefix_contri);
CustomMapPrefixTokens cmapt = new CustomMapPrefixTokens(RID,j);
index.put(ptoken, cmapt);
}
}
als = calcSimilarity(time, RID, hmap, alpha, hmap_overlap);
for(int i = 0; i<als.size(); i++)
{
if(als.get(i).getRIDPair()!=null)
{
alsim.add(als.get(i));
}
}
}
return alsim;
}
public void filters(MultiValueMap index, String ptoken, String RID, HashMap<String, String> hmap, float t, int size, float val, boolean j_l, HashMap<String, Float> alpha, HashMap<String, CustomMapOverlap> hmap_overlap, float j, float prefix_contri)
{
@SuppressWarnings("unchecked")
ArrayList<CustomMapPrefixTokens> positions_list = (ArrayList<CustomMapPrefixTokens>) index.get(ptoken);
if((positions_list!=null) &&(positions_list.size()!=0))
{
CustomMapPrefixTokens cmapt ;
String y;
Iterator<String> iter3;
int y_size = 0;
float check_size = 0;
// TreeMap<String, Integer> hmapy;
float RID_val=0;
float y_overlap = 0;
float ubound = 0;
ArrayList<Float> fl = new ArrayList<Float>();
StringTokenizer st;
for(int k = 0; k<positions_list.size(); k++)
{
cmapt = positions_list.get(k);
if(!cmapt.getRID().equals(RID))
{
y = hmap.get(cmapt.getRID());
// iter3 = y.keySet().iterator();
String yRID = cmapt.getRID();
st = new StringTokenizer(y,",");
y_size = Integer.parseInt(st.nextToken());
check_size = (float)0.8*(size);
if(y_size>=check_size)
{
//hmapy = hmap.get(yRID);
String y_data = st.nextToken();
StringTokenizer st1 = new StringTokenizer(y_data,"\\|");
while(st1.hasMoreTokens())
{
String token = st1.nextToken();
if(token.equals(ptoken))
{
String nxt_token = st1.nextToken();
// System.out.println("ydata:--->"+y_data+" nxt_token:--->"+nxt_token);
RID_val = (float)Integer.parseInt(nxt_token);
break;
}
}
// RID_val = (float) hmapy.get(ptoken);
float alpha1 = (float)(0.8/1.8)*(size+y_size);
fl = overlapCalc(alpha1, size, y_size, cmapt, j, alpha, j_l,RID_val,val,prefix_contri);
ubound = fl.get(0);
y_overlap = fl.get(1);
positionFilter(ubound, alpha1, cmapt, y_overlap, hmap_overlap);
}
}
}
}
}
public void positionFilter( float ubound,float alpha1, CustomMapPrefixTokens cmapt, float y_overlap, HashMap<String, CustomMapOverlap> hmap_overlap)
{
float y_overlap_total = 0;
if(null!=hmap_overlap.get(cmapt.getRID()))
{
y_overlap_total = hmap_overlap.get(cmapt.getRID()).getOverlap();
if((y_overlap_total+ubound)>=alpha1)
{
CustomMapOverlap cmap_tmp = hmap_overlap.get(cmapt.getRID());
float y_o_t = y_overlap+y_overlap_total;
cmap_tmp.setOverlap(y_o_t);
hmap_overlap.put(cmapt.getRID(),cmap_tmp);
}
else
{
float n = 0;
hmap_overlap.put(cmapt.getRID(), new CustomMapOverlap(cmapt.getRID(),n));
}
}
else
{
CustomMapOverlap cmap_tmp = new CustomMapOverlap(cmapt.getRID(),y_overlap);
hmap_overlap.put(cmapt.getRID(), cmap_tmp);
}
}
public ArrayList<Float> overlapCalc(float alpha1, int size, int y_size, CustomMapPrefixTokens cmapt, float j, HashMap<String, Float> alpha, boolean j_l, float RID_val, float val, float prefix_contri )
{
alpha.put(cmapt.getRID(), alpha1);
float min1 = y_size-cmapt.getPosition();
float min2 = size-j;
float min = 0;
float y_overlap = 0;
if(min1<min2)
{
min = min1;
}
else
{
min = min2;
}
if(j_l==true)
{
val = prefix_contri;
}
if(RID_val<val)
{
y_overlap = RID_val;
}
else
{
y_overlap = val;
}
float ubound = y_overlap+min;
ArrayList<Float> fl = new ArrayList<Float>();
fl.add(ubound);
fl.add(y_overlap);
return fl;
}
public ArrayList<CustomMapSimilarity> calcSimilarity( String time, String RID, HashMap<String,String> hmap , HashMap<String, Float> alpha, HashMap<String, CustomMapOverlap> hmap_overlap)
{
float jaccard = 0;
CustomMapSimilarity cms = new CustomMapSimilarity(null, null);
ArrayList<CustomMapSimilarity> alsim = new ArrayList<CustomMapSimilarity>();
Iterator<String> iter = hmap_overlap.keySet().iterator();
while(iter.hasNext())
{
String key = (String)iter.next();
CustomMapOverlap val = (CustomMapOverlap)hmap_overlap.get(key);
float overlap = (float)val.getOverlap();
if(overlap>0)
{
String yRID = val.getRID();
String RIDpair = RID+" "+yRID;
jaccard = unionIntersection(hmap, RIDpair);
if(jaccard>0.8)
{
cms = new CustomMapSimilarity(time+" "+RIDpair, String.valueOf(jaccard));
alsim.add(cms);
}
}
}
return alsim;
}
public float unionIntersection( HashMap<String,String> hmap, String RIDpair)
{
StringTokenizer st = new StringTokenizer(RIDpair);
String xRID = st.nextToken();
String yRID = st.nextToken();
String xdata = hmap.get(xRID);
String ydata = hmap.get(yRID);
int total_union = 0;
int xval = 0;
int yval = 0;
int part_union = 0;
int total_intersect = 0;
// System.out.println("xdata:------*************>"+xdata);
StringTokenizer xtokenizer = new StringTokenizer(xdata,",");
StringTokenizer ytokenizer = new StringTokenizer(ydata,",");
// String[] xpart = xdata.split(",");
// String[] ypart = ydata.split(",");
xtokenizer.nextToken();
ytokenizer.nextToken();
String datax = xtokenizer.nextToken();
String datay = ytokenizer.nextToken();
HashMap<String,Integer> x = new HashMap<String, Integer>();
HashMap<String,Integer> y = new HashMap<String, Integer>();
String [] xparts;
xparts = datax.toString().split("\\|");
String [] yparts;
yparts = datay.toString().split("\\|");
for(int i = 0; i<xparts.length-1; i++)
{
int part_size = Integer.parseInt(xparts[i+1]);
x.put(xparts[i], part_size);
i++;
}
for(int i = 0; i<yparts.length-1; i++)
{
int part_size = Integer.parseInt(yparts[i+1]);
y.put(xparts[i], part_size);
i++;
}
Set<String> xset = x.keySet();
Set<String> yset = y.keySet();
for(String elm:xset )
{
yval = 0;
xval = (Integer)x.get(elm);
part_union = 0;
int part_intersect = 0;
if(yset.contains(elm)){
yval = (Integer) y.get(elm);
if(xval>yval)
{
part_union = xval;
part_intersect = yval;
}
else
{
part_union = yval;
part_intersect = xval;
}
total_intersect = total_intersect+part_intersect;
}
else
{
part_union = xval;
}
total_union = total_union+part_union;
}
for(String elm: yset)
{
part_union = 0;
if(!xset.contains(elm))
{
part_union = (Integer) y.get(elm);
total_union = total_union+part_union;
}
}
float jaccard = (float)total_intersect/total_union;
return jaccard;
}
}
해결법
-
==============================
1.시간 초과의 원인은 진행률을 Hadoop 프레임 워크에보고하지 않고 감속기에서 장기간 계산 일 수 있습니다. 이는 다른 접근 방식을 사용하여 해결할 수 있습니다.
시간 초과의 원인은 진행률을 Hadoop 프레임 워크에보고하지 않고 감속기에서 장기간 계산 일 수 있습니다. 이는 다른 접근 방식을 사용하여 해결할 수 있습니다.
I. mapred-site.xml의 시간 초과 늘리기 :
<property> <name>mapred.task.timeout</name> <value>1200000</value> </property>
기본값은 600000 ms = 600 초입니다.
II. javadoc의 Reducer 예제와 같이 x 레코드마다 진행 상황을보고합니다.
public void reduce(K key, Iterator<V> values, OutputCollector<K, V> output, Reporter reporter) throws IOException { // report progress if ((noValues%10) == 0) { reporter.progress(); } // ... }
선택적으로 위의 예와 같이 맞춤 카운터를 증분 할 수 있습니다.
reporter.incrCounter(NUM_RECORDS, 1);
-
==============================
2.Java의 힙 공간을 모두 소비했거나 GC가 너무 자주 발생하여 감속기에 상태를 마스터로보고 기회가 주어지지 않아서 죽을 수도 있습니다.
Java의 힙 공간을 모두 소비했거나 GC가 너무 자주 발생하여 감속기에 상태를 마스터로보고 기회가 주어지지 않아서 죽을 수도 있습니다.
또 다른 가능성은 감속기 중 하나가 너무 기울어 져서 데이터를 가져 오는 것입니다. 즉, 특정 제거의 경우 많은 레코드가 있습니다.
다음 구성을 설정하여 Java 힙을 늘려보십시오. mapred.child.java.opts
에
-Xmx2048m
또한 다음 구성을 현재 값보다 낮은 값으로 설정하여 병렬 감속기의 수를 줄이고 시도하십시오 (기본값은 2).
mapred.tasktracker.reduce.tasks.maximum
from https://stackoverflow.com/questions/15281307/the-reduce-fails-due-to-task-attempt-failed-to-report-status-for-600-seconds-ki by cc-by-sa and MIT license
'HADOOP' 카테고리의 다른 글
[HADOOP] Sqoop을 사용하여 MySQL에서 하이브로 데이터 가져 오기 (0) | 2019.07.10 |
---|---|
[HADOOP] elasticsearch를 중앙 데이터 저장소로 사용 (0) | 2019.07.10 |
[HADOOP] HIVE에서 Date Series를 생성하는 방법은 무엇입니까? (테이블 만들기) (0) | 2019.07.10 |
[HADOOP] 파일 이름 + 오프셋을 포함하도록 SequenceFileInputFormat 확장 (0) | 2019.07.10 |
[HADOOP] 기본 HBase 테이블을 사용하는 튜닝 하이브 쿼리 (0) | 2019.07.10 |