복붙노트

[HADOOP] Spark의 Map Task에서 거대한 메모리 소비

HADOOP

Spark의 Map Task에서 거대한 메모리 소비

나는 대략 60.000.000 줄을 포함하는 많은 파일을 가지고있다. 모든 파일 형식은 {timestamp} # {producer} # {messageId} # {data_bytes} \ n 형식으로 지정됩니다. \ n

나는 하나씩 하나씩 파일을 살펴보고 입력 파일 하나당 하나의 출력 파일을 만들고 싶다. 일부 선은 이전 선에 의존하기 때문에 프로듀서별로 그룹화했습니다. 한 줄이 하나 이상의 이전 줄에 의존 할 때마다 그 줄은 항상 동일합니다. 모든 줄을 그룹화 한 후 필자는이를 Java 파서에 넘깁니다. 그런 다음 파서는 메모리에 파싱 된 모든 데이터 객체를 포함하고 나중에 JSON으로 출력합니다.

내 작업이 어떻게 처리되는지를 시각화하기 위해 다음 플로우 그래프를 작성했습니다. groupByKey-Shuffeling-Process를 시각화하지 않았습니다.

내 문제 :

내 질문 :

스칼라 코드 (관련없는 코드 부분을 생략했습니다) :

def main(args: Array[String]) {
    val inputFilePath = args(0)
    val outputFilePath = args(1)

    val inputFiles = fs.listStatus(new Path(inputFilePath))
    inputFiles.foreach( filename => {
        processData(filename.getPath, ...)
    }) 
}


def processData(filePath: Path, ...) {
    val lines  = sc.textFile(filePath.toString())
    val lineMap = lines.map(line => (line.split(" ")(1), line)).groupByKey()

    val parsedLines = lineMap.map{ case(key, values) => parseLinesByKey(key, values, config) }
    //each output should be saved separately
    parsedLines.saveAsTextFile(outputFilePath.toString() + "/" + filePath.getName)     
}


def parseLinesByKey(key: String, values: Iterable[String], config : Config) = {
    val importer = new LogFileImporter(...)
    importer.parseData(values.toIterator.asJava, ...)

    //importer from now contains all parsed data objects in memory that could be parsed 
    //from the given values.  

    val jsonMapper = getJsonMapper(...)
    val jsonStringData = jsonMapper.getValueFromString(importer.getDataObject)

    (key, jsonStringData)
}

해결법

  1. ==============================

    1.groupByKey 호출을 제거하고 새로운 FileInputFormat과 RecordReader를 구현하여 라인이 다른 라인에 의존한다는 내 제한 사항을 제거하여이 문제를 해결했습니다. 지금은 각 분할에 이전 분할의 50.000 바이트 오버 헤드가 포함될 수 있도록 구현했습니다. 이렇게하면 이전 행에 의존하는 모든 행을 올바르게 파싱 할 수 있습니다.

    groupByKey 호출을 제거하고 새로운 FileInputFormat과 RecordReader를 구현하여 라인이 다른 라인에 의존한다는 내 제한 사항을 제거하여이 문제를 해결했습니다. 지금은 각 분할에 이전 분할의 50.000 바이트 오버 헤드가 포함될 수 있도록 구현했습니다. 이렇게하면 이전 행에 의존하는 모든 행을 올바르게 파싱 할 수 있습니다.

    이제 계속 진행하면서 이전 분할의 마지막 50,000 바이트를 살펴보고 현재 분할의 구문 분석에 실제로 영향을주는 행만 복사합니다. 따라서 오버 헤드를 최소화하고 여전히 병렬 처리가 가능한 작업을 얻습니다.

    다음 링크는 저를 올바른 방향으로 끌고갔습니다. FileInputFormat / RecordReader의 주제가 첫눈에 상당히 복잡했기 때문에 (적어도 저에게 맞았습니다),이 기사를 읽고이 내용이 자신의 문제에 적합한 지 아닌지 이해하는 것이 좋습니다.

    웹 사이트가 다운 될 경우를 대비 한 ae.be 기사의 관련 코드 부분. 저자 (@Gurdt)는이 메시지를 사용하여 채팅 메시지에 이스케이프 처리 된 회신이 있는지 ( "\"로 끝나는 지) 검색하고 이스케이프 처리 된 회선을 이스케이프 처리되지 않은 \ n이 발견 될 때까지 첨부합니다. 이렇게하면 둘 이상의 행에 걸쳐있는 메시지를 검색 할 수 있습니다. 스칼라로 작성된 코드 :

    용법

    val conf = new Configuration(sparkContext.hadoopConfiguration)
    val rdd = sparkContext.newAPIHadoopFile("data.txt", classOf[MyFileInputFormat],
    classOf[LongWritable], classOf[Text], conf)
    

    FileInputFormat

    class MyFileInputFormat extends FileInputFormat[LongWritable, Text] {
        override def createRecordReader(split: InputSplit, context: TaskAttemptContext):
        RecordReader[LongWritable, Text] = new MyRecordReader()
    }
    

    RecordReader

    class MyRecordReader() extends RecordReader[LongWritable, Text] {
        var start, end, position = 0L
        var reader: LineReader = null
        var key = new LongWritable
        var value = new Text
    
        override def initialize(inputSplit: InputSplit, context: TaskAttemptContext): Unit = {
            // split position in data (start one byte earlier to detect if
            // the split starts in the middle of a previous record)
            val split = inputSplit.asInstanceOf[FileSplit]
            start = 0.max(split.getStart - 1)
            end = start + split.getLength
    
            // open a stream to the data, pointing to the start of the split
            val stream = split.getPath.getFileSystem(context.getConfiguration)
            .open(split.getPath)
            stream.seek(start)
            reader = new LineReader(stream, context.getConfiguration)
    
            // if the split starts at a newline, we want to start yet another byte
            // earlier to check if the newline was escaped or not
            val firstByte = stream.readByte().toInt
            if(firstByte == '\n')
                start = 0.max(start - 1)
            stream.seek(start)
    
            if(start != 0)
                skipRemainderFromPreviousSplit(reader)
        }
    
        def skipRemainderFromPreviousSplit(reader: LineReader): Unit = {
            var readAnotherLine = true
            while(readAnotherLine) {
                // read next line
                val buffer = new Text()
                start += reader.readLine(buffer, Integer.MAX_VALUE, Integer.MAX_VALUE)
                pos = start
    
                // detect if delimiter was escaped
                readAnotherLine = buffer.getLength >= 1 && // something was read
                buffer.charAt(buffer.getLength - 1) == '\\' && // newline was escaped
                pos <= end // seek head hasn't passed the split
            }
        }
    
        override def nextKeyValue(): Boolean = {
            key.set(pos)
    
            // read newlines until an unescaped newline is read
            var lastNewlineWasEscaped = false
            while (pos < end || lastNewlineWasEscaped) {
                // read next line
                val buffer = new Text
                pos += reader.readLine(buffer, Integer.MAX_VALUE, Integer.MAX_VALUE)
    
                // append newly read data to previous data if necessary
                value = if(lastNewlineWasEscaped) new Text(value + "\n" + buffer) else buffer
    
                // detect if delimiter was escaped
                lastNewlineWasEscaped = buffer.charAt(buffer.getLength - 1) == '\\'
    
                // let Spark know that a key-value pair is ready!
                if(!lastNewlineWasEscaped)
                    return true
            }
    
            // end of split reached?
            return false
        }
    }
    

    참고 : getCurrentKey, getCurrentValue, close 및 getProgress를 RecordReader에도 구현해야 할 수도 있습니다.

  2. from https://stackoverflow.com/questions/37569635/huge-memory-consumption-in-map-task-in-spark by cc-by-sa and MIT license