[HADOOP] 왜이 Pig UDF 결과는 "오류 : Java 힙 공간"에 데이터 벅을 디스크에 쏟아 놓았습니까?
HADOOP왜이 Pig UDF 결과는 "오류 : Java 힙 공간"에 데이터 벅을 디스크에 쏟아 놓았습니까?
여기 내 UDF가 있습니다 :
public DataBag exec(Tuple input) throws IOException {
Aggregate aggregatedOutput = null;
int spillCount = 0;
DataBag outputBag = BagFactory.newDefaultBag();
DataBag values = (DataBag)input.get(0);
for (Iterator<Tuple> iterator = values.iterator(); iterator.hasNext();) {
Tuple tuple = iterator.next();
//spillCount++;
...
if (some condition regarding current input tuple){
//do something to aggregatedOutput with information from input tuple
} else {
//Because input tuple does not apply to current aggregateOutput
//return current aggregateOutput and apply input tuple
//to new aggregateOutput
Tuple returnTuple = aggregatedOutput.getTuple();
outputBag.add(returnTuple);
spillCount++;
aggregatedOutputTuple = new Aggregate(tuple);
if (spillCount == 1000) {
outputBag.spill();
spillCount = 0;
}
}
}
return outputBag;
}
1,000 개의 입력 튜플마다 가방이 디스크에 쏟아집니다. 나는이 수치를 50으로 낮추고 100,000만큼 높게 설정했지만 여전히 메모리 오류가 발생한다 :
Pig logfile dump:
Backend error message
---------------------
Error: Java heap space
Pig Stack Trace
---------------
ERROR 2997: Unable to recreate exception from backed error: Error: Java heap space
이 문제를 해결하려면 어떻게해야합니까? 그것은 약 백만 행을 처리하고 있습니다.
Accumulator 인터페이스 사용 :
public class Foo extends EvalFunc<DataBag> implements Accumulator<DataBag> {
private DataBag outputBag = null;
private UltraAggregation currentAggregation = null;
public void accumulate(Tuple input) throws IOException {
DataBag values = (DataBag)input.get(0);
Aggregate aggregatedOutput = null;
outputBag = BagFactory.getInstance().newDefaultBag();
for (Iterator<Tuple> iterator = values.iterator(); iterator.hasNext();) {
Tuple tuple = iterator.next();
...
if (some condition regarding current input tuple){
//do something to aggregatedOutput with information from input tuple
} else {
//Because input tuple does not apply to current aggregateOutput
//return current aggregateOutput and apply input tuple
//to new aggregateOutput
outputBag.add(aggregatedOutput.getTuple());
aggregatedOutputTuple = new Aggregate(tuple);
}
}
}
// Called when all tuples from current key have been passed to accumulate
public DataBag getValue() {
//Add final current aggregation
outputBag.add(currentAggregation.getTuple());
return outputBag;
}
// This is called after getValue()
// Not sure if these commands are necessary as they are repeated in beginning of accumulate
public void cleanup() {
outputBag = null;
currentAggregation = null;
}
public DataBag exec(Tuple input) throws IOException {
// Same as above ^^ but this doesn't appear to ever be called.
}
public Schema outputSchema(Schema input) {
try {
return new Schema(new FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input), bagSchema, DataType.BAG));
} catch {FrontendException e) {
e.printStackTrace();
return null;
}
}
class Aggregate {
...
public Tuple getTuple() {
Tuple output = TupleFactory.getInstance().newTuple(OUTPUT_TUPLE_SIZE);
try {
output.set(0, val);
...
} catch (ExecException e) {
e.printStackTrace();
return null;
}
}
...
}
}
해결법
-
==============================
1.iterator에서 튜플을 얻을 때마다가 아니라 outputBag에 추가 할 때마다 spillCount를 증가시켜야합니다. spillCount가 1000의 배수이고 if 조건이 충족되지 않을 때만 유출됩니다 (논리에 따라 자주 발생하지 않을 수도 있음). 이것은 왜 서로 다른 유출 임계 값에 대해 많은 차이가 없는지 설명 할 수 있습니다.
iterator에서 튜플을 얻을 때마다가 아니라 outputBag에 추가 할 때마다 spillCount를 증가시켜야합니다. spillCount가 1000의 배수이고 if 조건이 충족되지 않을 때만 유출됩니다 (논리에 따라 자주 발생하지 않을 수도 있음). 이것은 왜 서로 다른 유출 임계 값에 대해 많은 차이가 없는지 설명 할 수 있습니다.
문제가 해결되지 않으면 AccumulatorEvalFunc
를 확장 해보십시오. 귀하의 경우 실제로 가방 전체에 접근 할 필요가 없습니다. 현재 튜플에 대한 액세스 만 필요하기 때문에 구현은 누적 기 스타일 구현에 적합합니다. 이로 인해 메모리 사용이 줄어들 수 있습니다. 본질적으로 최종 출력을 누적하는 DataBag 유형의 인스턴스 변수가 있습니다. aggregateOutput에 대한 인스턴스 변수를 가지면 현재 집계를 가질 수 있습니다. accumulate () 호출은 1) 현재 집계를 업데이트하거나 2) 집계 된 집계를 aggregatedOutput에 추가하고 새 집계를 시작합니다. 이것은 기본적으로 for 루프의 본문을 따릅니다.
from https://stackoverflow.com/questions/21567307/why-does-this-pig-udf-result-in-an-error-java-heap-space-given-that-i-am-spil by cc-by-sa and MIT license
'HADOOP' 카테고리의 다른 글
[HADOOP] Hive의 hour () 함수는 12 시간 시계 값을 반환합니다. (0) | 2019.08.01 |
---|---|
[HADOOP] Hive의 '테이블 <테이블 이름> 연결'작업을 어떻게 변경합니까? (0) | 2019.08.01 |
[HADOOP] Map-Reduce / Hadoop을 정수 값으로 정렬 (MRJob 사용) (0) | 2019.08.01 |
[HADOOP] hadoop 작업 추적기를 시작할 수 없습니다. (0) | 2019.08.01 |
[HADOOP] Spark는 유언 집행자와 코어의 수와 관계를 형성합니다. (0) | 2019.08.01 |