복붙노트

[HADOOP] 스파크가 java.io.IOException로 큰 셔플 작업에 실패합니다 : 파일 시스템이 닫혔습니다.

HADOOP

스파크가 java.io.IOException로 큰 셔플 작업에 실패합니다 : 파일 시스템이 닫혔습니다.

나는 종종 도움이되지 않는 무의미한 예외로 커다란 직업으로 스파크가 실패하는 것을 발견한다. 작업자 로그는 정상적으로 보이며 오류는 발생하지 않지만 "KILLED"상태가됩니다. 이것은 큰 셔플 (chuffles)에서 매우 일반적이므로, .distinct와 같은 작업이 가능합니다.

문제는 무엇이 잘못되었는지 진단하는 방법이며 이상적으로 어떻게 해결할 수 있습니까?

이러한 작업 중 많은 부분이 단순한 것이라는 점을 감안할 때 데이터를 10 개의 청크로 분할하고 각 청크에서 응용 프로그램을 실행 한 다음 모든 결과 출력에서 ​​응용 프로그램을 실행하여 문제를 해결하려고 노력했습니다. 다른 말로하면 - 메타 -지도 - 감소.

14/06/04 12:56:09 ERROR client.AppClient$ClientActor: Master removed our application: FAILED; stopping client
14/06/04 12:56:09 WARN cluster.SparkDeploySchedulerBackend: Disconnected from Spark cluster! Waiting for reconnection...
14/06/04 12:56:09 WARN scheduler.TaskSetManager: Loss was due to java.io.IOException
java.io.IOException: Filesystem closed
    at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:703)
    at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:779)
    at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:840)
    at java.io.DataInputStream.read(DataInputStream.java:149)
    at org.apache.hadoop.io.compress.DecompressorStream.getCompressedData(DecompressorStream.java:159)
    at org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:143)
    at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:85)
    at java.io.InputStream.read(InputStream.java:101)
    at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180)
    at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
    at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
    at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:209)
    at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:47)
    at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:164)
    at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:149)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:27)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:176)
    at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:45)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toList(TraversableOnce.scala:257)
    at scala.collection.AbstractIterator.toList(Iterator.scala:1157)
    at $line5.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:13)
    at $line5.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:13)
    at org.apache.spark.rdd.RDD$$anonfun$1.apply(RDD.scala:450)
    at org.apache.spark.rdd.RDD$$anonfun$1.apply(RDD.scala:450)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
    at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
    at org.apache.spark.scheduler.Task.run(Task.scala:53)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
    at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
    at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
    at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)

해결법

  1. ==============================

    1.2014 년 9 월 1 일부터 Spark의 '개선 된 기능'입니다. https://issues.apache.org/jira/browse/SPARK-3052를 참조하십시오. 주어진 링크에서 syrza가 지적했듯이, executor가 실패했을 때이 메시지를 초래할 때 종료 훅은 잘못된 순서로 수행 될 가능성이 큽니다. 문제의 주요 원인 (즉, 집행자가 실패한 이유)을 파악하기 위해 더 많은 조사를해야 할 것입니다. 커다란 셔플 인 경우, 메모리 부족 오류로 인해 실행 프로그램 오류가 발생하여 Hadoop 파일 시스템이 종료 훅에서 닫히게됩니다. 따라서 Executor의 실행중인 작업에서 RecordReaders는 "java.io.IOException : Filesystem closed"예외를 발생시킵니다. 나는 그것이 다음 릴리스에서 해결 될 것 같아요 그리고 당신은 더 유용한 오류 메시지를 얻을 것이다 :)

    2014 년 9 월 1 일부터 Spark의 '개선 된 기능'입니다. https://issues.apache.org/jira/browse/SPARK-3052를 참조하십시오. 주어진 링크에서 syrza가 지적했듯이, executor가 실패했을 때이 메시지를 초래할 때 종료 훅은 잘못된 순서로 수행 될 가능성이 큽니다. 문제의 주요 원인 (즉, 집행자가 실패한 이유)을 파악하기 위해 더 많은 조사를해야 할 것입니다. 커다란 셔플 인 경우, 메모리 부족 오류로 인해 실행 프로그램 오류가 발생하여 Hadoop 파일 시스템이 종료 훅에서 닫히게됩니다. 따라서 Executor의 실행중인 작업에서 RecordReaders는 "java.io.IOException : Filesystem closed"예외를 발생시킵니다. 나는 그것이 다음 릴리스에서 해결 될 것 같아요 그리고 당신은 더 유용한 오류 메시지를 얻을 것이다 :)

  2. ==============================

    2.DFSClient.close () 또는 DFSClient.abort ()를 호출하여 클라이언트를 닫습니다. 다음 파일 조작은 위의 예외를 초래합니다.

    DFSClient.close () 또는 DFSClient.abort ()를 호출하여 클라이언트를 닫습니다. 다음 파일 조작은 위의 예외를 초래합니다.

    나는 close () / abort ()를 호출하는 것을 알아 내려고 노력할 것이다. 디버거에서 중단 점을 사용하거나 Hadoop 소스 코드를 수정하여 이러한 메서드에서 예외를 throw하여 스택 추적을 얻을 수 있습니다.

  3. ==============================

    3.spark 작업이 클러스터에서 실행중인 경우 "파일 시스템을 닫음"에 대한 예외가 해결 될 수 있습니다. spark.executor.cores, spark.driver.cores 및 spark.akka.threads와 같은 등록 정보를 자원 가용성에 따라 최대 값으로 설정할 수 있습니다. 내 데이터 세트가 JSON 데이터로 2 천만 개에 이르는 레코드를 사용했을 때도 동일한 문제가 발생했습니다. 나는 위의 속성들로 그것을 고쳤으며 매력처럼 보였다. 필자의 경우,이 속성을 각각 25, 25 및 20으로 설정했습니다. 희망이 도움이 !!

    spark 작업이 클러스터에서 실행중인 경우 "파일 시스템을 닫음"에 대한 예외가 해결 될 수 있습니다. spark.executor.cores, spark.driver.cores 및 spark.akka.threads와 같은 등록 정보를 자원 가용성에 따라 최대 값으로 설정할 수 있습니다. 내 데이터 세트가 JSON 데이터로 2 천만 개에 이르는 레코드를 사용했을 때도 동일한 문제가 발생했습니다. 나는 위의 속성들로 그것을 고쳤으며 매력처럼 보였다. 필자의 경우,이 속성을 각각 25, 25 및 20으로 설정했습니다. 희망이 도움이 !!

    참조 링크 :

    http://spark.apache.org/docs/latest/configuration.html

  4. from https://stackoverflow.com/questions/24038908/spark-fails-on-big-shuffle-jobs-with-java-io-ioexception-filesystem-closed by cc-by-sa and MIT license