복붙노트

[HADOOP] Pyspark java.lang.OutOfMemoryError : 요청 된 배열 크기가 VM 한계를 초과합니다

HADOOP

Pyspark java.lang.OutOfMemoryError : 요청 된 배열 크기가 VM 한계를 초과합니다

Pyspark 작업을 실행 중입니다.

spark-submit --master yarn-client --driver-memory 150G --num-executors 8 --executor-cores 4 --executor-memory 150G benchmark_script_1.py hdfs:///tmp/data/sample150k 128 hdfs:///tmp/output/sample150k | tee ~/output/sample150k.log

직업 자체는 꽤 표준입니다. 단지 일부 파일을 가져 와서 계산합니다. :

print(str(datetime.now()) + " - Ingesting files...")
files = sc.wholeTextFiles(inputFileDir, partitions)
fileCount = files.count()
print(str(datetime.now()) + " - " + str(fileCount) + " files ingested")

소스 폴더에는 ~ 150'000 개의 파일이 있습니다. 복제가없는 약 35G, 복제가있는 105G입니다. 상당히 무겁지만 미쳤습니다.

위를 실행하면 다음과 같은 스택 추적이 제공됩니다.

15/08/11 15:39:20 WARN TaskSetManager: Lost task 61.3 in stage 0.0 (TID 76, <NODE>): java.io.IOException: Filesystem closed
        at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:794)
        at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:833)
        at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:897)
        at java.io.DataInputStream.read(DataInputStream.java:100)
        at org.spark-project.guava.io.ByteStreams.copy(ByteStreams.java:207)
        at org.spark-project.guava.io.ByteStreams.toByteArray(ByteStreams.java:252)
        at org.apache.spark.input.WholeTextFileRecordReader.nextKeyValue(WholeTextFileRecordReader.scala:83)
        at org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader.nextKeyValue(CombineFileRecordReader.java:69)
        at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:143)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
        at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:405)
        at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:243)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1617)
        at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:205)

자세한 정보는 문제가되는 executor 로그에서 찾을 수 있습니다.

15/08/11 12:28:18 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL 15: SIGTERM
15/08/11 12:28:18 ERROR util.Utils: Uncaught exception in thread stdout writer for python
java.lang.OutOfMemoryError: Requested array size exceeds VM limit
        at java.lang.StringCoding.encode(StringCoding.java:350)
        at java.lang.String.getBytes(String.java:939)
        at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:573)
        at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:395)
        at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:405)
        at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:405)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
        at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:405)
        at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:243)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1617)
        at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:205)
Traceback (most recent call last):
  File "/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/jars/spark-assembly-1.3.0-cdh5.4.2-hadoop2.6.0-cdh5.4.2.jar/pyspark/daemon.py", line 162, in manager
  File "/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/jars/spark-assembly-1.3.0-cdh5.4.2-hadoop2.6.0-cdh5.4.2.jar/pyspark/daemon.py", line 60, in worker
  File "/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/jars/spark-assembly-1.3.0-cdh5.4.2-hadoop2.6.0-cdh5.4.2.jar/pyspark/worker.py", line 126, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/jars/spark-assembly-1.3.0-cdh5.4.2-hadoop2.6.0-cdh5.4.2.jar/pyspark/serializers.py", line 528, in read_int
15/08/11 12:28:18 ERROR util.SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[stdout writer for python,5,main]
java.lang.OutOfMemoryError: Requested array size exceeds VM limit
        at java.lang.StringCoding.encode(StringCoding.java:350)
        at java.lang.String.getBytes(String.java:939)
        at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:573)
        at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:395)
        at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:405)
        at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:405)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
        at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:405)
        at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:243)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1617)
        at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:205)
    raise EOFError
EOFError
Traceback (most recent call last):
  File "/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/jars/spark-assembly-1.3.0-cdh5.4.2-hadoop2.6.0-cdh5.4.2.jar/pyspark/daemon.py", line 162, in manager
  File "/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/jars/spark-assembly-1.3.0-cdh5.4.2-hadoop2.6.0-cdh5.4.2.jar/pyspark/daemon.py", line 60, in worker
  File "/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/jars/spark-assembly-1.3.0-cdh5.4.2-hadoop2.6.0-cdh5.4.2.jar/pyspark/worker.py", line 126, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/jars/spark-assembly-1.3.0-cdh5.4.2-hadoop2.6.0-cdh5.4.2.jar/pyspark/serializers.py", line 528, in read_int
15/08/11 12:28:18 ERROR executor.Executor: Exception in task 7.0 in stage 0.0 (TID 5)
java.io.IOException: Filesystem closed
        at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:794)
        at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:833)
        at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:897)
        at java.io.DataInputStream.read(DataInputStream.java:100)
        at org.spark-project.guava.io.ByteStreams.copy(ByteStreams.java:207)
        at org.spark-project.guava.io.ByteStreams.toByteArray(ByteStreams.java:252)
        at org.apache.spark.input.WholeTextFileRecordReader.nextKeyValue(WholeTextFileRecordReader.scala:83)
        at org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader.nextKeyValue(CombineFileRecordReader.java:69)
        at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:143)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
        at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:405)
        at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:243)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1617)
        at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:205)
    raise EOFError
EOFError

HDFS 캐시를 비활성화했습니다.

conf.set("fs.hdfs.impl.disable.cache", True)

스칼라에서 정확히 같은 스크립트에는 전혀 문제가 없습니다.

이것은 큰 작업이지만 사용 가능한 메모리가 많이 있습니다. 문제가 무엇인지 아는 사람이 있습니까?

최신 정보

JVM에 더 많은 메모리를 할당했습니다.

export set JAVA_OPTS="-Xmx6G -XX:MaxPermSize=2G -XX:+UseCompressedOops"

슬프게도 개선이 없습니다.

해결법

  1. ==============================

    1.spark-submit 및 Java와 비슷한 문제가 발생하여 8GB DataFrame을 절약합니다. 16 코어, 300GB RAM의 도커 컨테이너. 아직 문제를 해결하지 못했지만 몇 가지 가능한 해결 방법이 있습니다.

    spark-submit 및 Java와 비슷한 문제가 발생하여 8GB DataFrame을 절약합니다. 16 코어, 300GB RAM의 도커 컨테이너. 아직 문제를 해결하지 못했지만 몇 가지 가능한 해결 방법이 있습니다.

    77 페이지부터 Lightbend는 쉘에 문제가 있다고 제안합니다. @transient를 사용하거나 객체에 캡슐화하면 해결 방법이 될 수 있습니다. 이것은 어느 경우에도 적용되지 않는 것 같습니다.

    DataBricks는 spark.sql.shuffle.partitions를 늘리는 것이 도움이 될 수 있다고 제안합니다. 기본 '200'에서 '400'으로 변경하는 것이 좋습니다. spark-defaults.conf에서 '800'과 '2000'을 시도했지만 여전히 OOM 오류가 발생합니다.

    DataBricks는 또한 코드에서 DataFrame.repartition (400)을 호출 할 것을 제안합니다. 또는 sc.wholeTextFiles (inputFileDir, partitions) 호출에 대한 마지막 매개 변수로 파티션 수를 늘리십시오.

    힙 크기가 32GB보다 큰 경우 -XX : + UseCompressedOops가 비활성화되어 (Java 8에서) StackOverflow의 JAVA_OPTS 제안이 적용되지 않습니다.

    편집하다

    또한 시도 :

    가능한 해결 방법

  2. from https://stackoverflow.com/questions/31945007/pyspark-java-lang-outofmemoryerror-requested-array-size-exceeds-vm-limit by cc-by-sa and MIT license