복붙노트

[HADOOP] 왜이 Pig UDF 결과는 "오류 : Java 힙 공간"에 데이터 벅을 디스크에 쏟아 놓았습니까?

HADOOP

왜이 Pig UDF 결과는 "오류 : Java 힙 공간"에 데이터 벅을 디스크에 쏟아 놓았습니까?

여기 내 UDF가 있습니다 :

public DataBag exec(Tuple input) throws IOException { 
    Aggregate aggregatedOutput = null;

    int spillCount = 0;

    DataBag outputBag = BagFactory.newDefaultBag(); 
    DataBag values = (DataBag)input.get(0);
    for (Iterator<Tuple> iterator = values.iterator(); iterator.hasNext();) {
        Tuple tuple = iterator.next();
        //spillCount++;
        ...
        if (some condition regarding current input tuple){
            //do something to aggregatedOutput with information from input tuple
        } else {
            //Because input tuple does not apply to current aggregateOutput
            //return current aggregateOutput and apply input tuple
            //to new aggregateOutput
            Tuple returnTuple = aggregatedOutput.getTuple();
            outputBag.add(returnTuple);
            spillCount++;
            aggregatedOutputTuple = new Aggregate(tuple);


            if (spillCount == 1000) {
                outputBag.spill();
                spillCount = 0;
            }
        }
    }
    return outputBag; 
}

1,000 개의 입력 튜플마다 가방이 디스크에 쏟아집니다. 나는이 수치를 50으로 낮추고 100,000만큼 높게 설정했지만 여전히 메모리 오류가 발생한다 :

Pig logfile dump:

Backend error message
---------------------
Error: Java heap space

Pig Stack Trace
---------------
ERROR 2997: Unable to recreate exception from backed error: Error: Java heap space

이 문제를 해결하려면 어떻게해야합니까? 그것은 약 백만 행을 처리하고 있습니다.

Accumulator 인터페이스 사용 :

public class Foo extends EvalFunc<DataBag> implements Accumulator<DataBag> {
    private DataBag outputBag = null;
    private UltraAggregation currentAggregation = null;

    public void accumulate(Tuple input) throws IOException {
        DataBag values = (DataBag)input.get(0);
        Aggregate aggregatedOutput = null;
        outputBag = BagFactory.getInstance().newDefaultBag();

        for (Iterator<Tuple> iterator = values.iterator(); iterator.hasNext();) {
            Tuple tuple = iterator.next();
            ...
            if (some condition regarding current input tuple){
                //do something to aggregatedOutput with information from input tuple
            } else {
                //Because input tuple does not apply to current aggregateOutput
                //return current aggregateOutput and apply input tuple
                //to new aggregateOutput
                outputBag.add(aggregatedOutput.getTuple());
                aggregatedOutputTuple = new Aggregate(tuple);
            }
        }
    }

    // Called when all tuples from current key have been passed to accumulate
    public DataBag getValue() {
        //Add final current aggregation
        outputBag.add(currentAggregation.getTuple());
        return outputBag;
    }
    // This is called after getValue()
    // Not sure if these commands are necessary as they are repeated in beginning of accumulate
    public void cleanup() {
        outputBag = null;
        currentAggregation = null;
    }

    public DataBag exec(Tuple input) throws IOException {
        // Same as above ^^ but this doesn't appear to ever be called.
    }

    public Schema outputSchema(Schema input) {
        try {
            return new Schema(new FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input), bagSchema, DataType.BAG));
        } catch {FrontendException e) {
            e.printStackTrace();
            return null;
        }
    }

    class Aggregate {
        ...
        public Tuple getTuple() {
            Tuple output = TupleFactory.getInstance().newTuple(OUTPUT_TUPLE_SIZE);
            try {
                output.set(0, val);
                ...
            } catch (ExecException e) {
                e.printStackTrace();
                return null;
            }
        }
        ...
    }
}

해결법

  1. ==============================

    1.iterator에서 튜플을 얻을 때마다가 아니라 outputBag에 추가 할 때마다 spillCount를 증가시켜야합니다. spillCount가 1000의 배수이고 if 조건이 충족되지 않을 때만 유출됩니다 (논리에 따라 자주 발생하지 않을 수도 있음). 이것은 왜 서로 다른 유출 임계 값에 대해 많은 차이가 없는지 설명 할 수 있습니다.

    iterator에서 튜플을 얻을 때마다가 아니라 outputBag에 추가 할 때마다 spillCount를 증가시켜야합니다. spillCount가 1000의 배수이고 if 조건이 충족되지 않을 때만 유출됩니다 (논리에 따라 자주 발생하지 않을 수도 있음). 이것은 왜 서로 다른 유출 임계 값에 대해 많은 차이가 없는지 설명 할 수 있습니다.

    문제가 해결되지 않으면 AccumulatorEvalFunc 를 확장 해보십시오. 귀하의 경우 실제로 가방 전체에 접근 할 필요가 없습니다. 현재 튜플에 대한 액세스 만 필요하기 때문에 구현은 누적 기 스타일 구현에 적합합니다. 이로 인해 메모리 사용이 줄어들 수 있습니다. 본질적으로 최종 출력을 누적하는 DataBag 유형의 인스턴스 변수가 있습니다. aggregateOutput에 대한 인스턴스 변수를 가지면 현재 집계를 가질 수 있습니다. accumulate () 호출은 1) 현재 집계를 업데이트하거나 2) 집계 된 집계를 aggregatedOutput에 추가하고 새 집계를 시작합니다. 이것은 기본적으로 for 루프의 본문을 따릅니다.

  2. from https://stackoverflow.com/questions/21567307/why-does-this-pig-udf-result-in-an-error-java-heap-space-given-that-i-am-spil by cc-by-sa and MIT license