[HADOOP] SparkR 작업 100 분 시간 초과
HADOOPSparkR 작업 100 분 시간 초과
필자는 약간 복잡한 sparkR 스크립트를 작성하여 spark-submit을 사용하여 실행했다. 어떤 스크립트는 기본적으로 큰 하이브 / 임팔라 마루 기반 행을 기준으로 행을 읽고 동일한 수의 행을 갖는 새로운 쪽모 파일을 생성합니다. 하지만 시간이 얼마 남지 않은 정확히 100 분이 지나면 일이 멈추는 것 같습니다.
내가 알고 있고 테스트 한 100 분의 값을 갖는 모든 가능한 매개 변수를 점검했습니다. 그러나 어떤 해결책도 찾을 수 없었습니다.
[user@localhost R]$ time spark-submit sparkr-pre.R
Loading required package: methods
Attaching package: ‘SparkR’
The following objects are masked from ‘package:stats’:
filter, na.omit
The following objects are masked from ‘package:base’:
intersect, rbind, sample, subset, summary, table, transform
15/12/30 18:04:27 WARN MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set.
[Stage 1:========================================> (7 + 3) / 10]Error in if (returnStatus != 0) { : argument is of length zero
Calls: write.df -> write.df -> .local -> callJMethod -> invokeJava
Execution halted
15/12/30 19:44:52 ERROR InsertIntoHadoopFsRelation: Aborting job.
org.apache.spark.SparkException: Job cancelled because SparkContext was shut down
at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:703)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:702)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:702)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1514)
at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1438)
at org.apache.spark.SparkContext$$anonfun$stop$7.apply$mcV$sp(SparkContext.scala:1724)
at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1185)
at org.apache.spark.SparkContext.stop(SparkContext.scala:1723)
at org.apache.spark.SparkContext$$anonfun$3.apply$mcV$sp(SparkContext.scala:587)
at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:264)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:234)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:234)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:234)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:234)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:234)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:234)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:234)
at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:216)
at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1837)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1914)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:150)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)
at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:933)
at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:933)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:197)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146)
at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1855)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.api.r.RBackendHandler.handleMethodCall(RBackendHandler.scala:132)
at org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:79)
at org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:38)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:745)
15/12/30 19:44:52 ERROR DefaultWriterContainer: Job job_201512301804_0000 aborted.
15/12/30 19:44:52 ERROR RBackendHandler: save on 25 failed
real 100m30.944s
user 1m26.326s
sys 0m19.459s
환경 런타임 정보
Name Value
Java Home /usr/java/jdk1.8.0_40/jre
Java Version 1.8.0_40 (Oracle Corporation)
Scala Version version 2.10.4
Spark Properties
Name Value
spark.akka.frameSize 1024
spark.app.id application_1451466100034_0019
spark.app.name SparkR
spark.driver.appUIAddress http://x.x.x.x:4040
spark.driver.host x.x.x.x
spark.driver.maxResultSize 8G
spark.driver.memory 100G
spark.driver.port 60471
spark.executor.id driver
spark.executor.memory 14G
spark.executorEnv.LD_LIBRARY_PATH $LD_LIBRARY_PATH:/usr/lib64/R/lib:/usr/local/lib64:/usr/lib/jvm/jre/lib/amd64/server:/usr/lib/jvm/jre/lib/amd64:/usr/lib/jvm/java/lib/amd64:/usr/java/packages/lib/amd64:/lib:/usr/lib::/usr/lib/hadoop/lib/native
spark.externalBlockStore.folderName spark-b60f685e-c46c-435d-ab1b-c9d1279f630f
spark.fileserver.uri http://x.x.x.x:50281
spark.home /datas/spark-1.5.2-bin-hadoop2.6
spark.kryoserializer.buffer.max 2000M
spark.master yarn-client
spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_HOSTS CDHPR1.dc.dialog.lk
spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_URI_BASES http://CDHPR1.dc.dialog.lk:8088/proxy/application_1451466100034_0019
spark.scheduler.mode FIFO
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.sql.parquet.binaryAsString true
spark.submit.deployMode client
spark.ui.filters org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
spark.yarn.dist.archives file:/datas/spark-1.5.2-bin-hadoop2.6/R/lib/sparkr.zip#sparkr
spark.yarn.dist.files file:/home/inuser/R/sparkr-pre.R
System Properties
Name Value
SPARK_SUBMIT true
SPARK_YARN_MODE true
awt.toolkit sun.awt.X11.XToolkit
file.encoding UTF-8
file.encoding.pkg sun.io
file.separator /
java.awt.graphicsenv sun.awt.X11GraphicsEnvironment
java.awt.printerjob sun.print.PSPrinterJob
java.class.version 52.0
java.endorsed.dirs /usr/java/jdk1.8.0_40/jre/lib/endorsed
java.ext.dirs /usr/java/jdk1.8.0_40/jre/lib/ext:/usr/java/packages/lib/ext
java.home /usr/java/jdk1.8.0_40/jre
java.io.tmpdir /tmp
java.library.path :/usr/lib/hadoop/lib/native:/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
java.runtime.name Java(TM) SE Runtime Environment
java.runtime.version 1.8.0_40-b26
java.specification.name Java Platform API Specification
java.specification.vendor Oracle Corporation
java.specification.version 1.8
java.vendor Oracle Corporation
java.vendor.url http://java.oracle.com/
java.vendor.url.bug http://bugreport.sun.com/bugreport/
java.version 1.8.0_40
java.vm.info mixed mode
java.vm.name Java HotSpot(TM) 64-Bit Server VM
java.vm.specification.name Java Virtual Machine Specification
java.vm.specification.vendor Oracle Corporation
java.vm.specification.version 1.8
java.vm.vendor Oracle Corporation
java.vm.version 25.40-b25
line.separator
os.arch amd64
os.name Linux
os.version 2.6.32-431.el6.x86_64
path.separator :
sun.arch.data.model 64
sun.boot.class.path /usr/java/jdk1.8.0_40/jre/lib/resources.jar:/usr/java/jdk1.8.0_40/jre/lib/rt.jar:/usr/java/jdk1.8.0_40/jre/lib/sunrsasign.jar:/usr/java/jdk1.8.0_40/jre/lib/jsse.jar:/usr/java/jdk1.8.0_40/jre/lib/jce.jar:/usr/java/jdk1.8.0_40/jre/lib/charsets.jar:/usr/java/jdk1.8.0_40/jre/lib/jfr.jar:/usr/java/jdk1.8.0_40/jre/classes
sun.boot.library.path /usr/java/jdk1.8.0_40/jre/lib/amd64
sun.cpu.endian little
sun.cpu.isalist
sun.io.unicode.encoding UnicodeLittle
sun.java.command org.apache.spark.deploy.SparkSubmit sparkr-pre.R
sun.java.launcher SUN_STANDARD
sun.jnu.encoding UTF-8
sun.management.compiler HotSpot 64-Bit Tiered Compilers
sun.nio.ch.bugLevel
sun.os.patch.level unknown
user.country US
user.dir /home/user/R
user.home /home/user
user.language en
user.name inuser
user.timezone Asia/Colombo
Classpath Entries
Resource Source
/datas/spark-1.5.2-bin-hadoop2.6/conf/ System Classpath
/datas/spark-1.5.2-bin-hadoop2.6/conf/yarn-conf/ System Classpath
/datas/spark-1.5.2-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar System Classpath
/datas/spark-1.5.2-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar System Classpath
/datas/spark-1.5.2-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar System Classpath
/datas/spark-1.5.2-bin-hadoop2.6/lib/spark-assembly-1.5.2-hadoop2.6.0.jar System Classpath
spark-default.conf
# Default system properties included when running spark-submit.
# This is useful for setting default environmental settings.
# Example:
# spark.master spark://master:7077
# spark.eventLog.enabled true
# spark.eventLog.dir hdfs://namenode:8021/directory
# spark.serializer org.apache.spark.serializer.KryoSerializer
# spark.driver.memory 5g
# spark.executor.extraJavaOptions -XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three"
#
spark.master yarn-client
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.driver.memory 100G
spark.executor.memory 14G
spark.sql.parquet.binaryAsString true
spark.kryoserializer.buffer.max 2000M
spark.driver.maxResultSize 8G
spark.akka.frameSize 1024
#spark.executor.instances 16
공용으로 sparkR 스크립트를 공유 할 수 없습니다. 그것에 대해 정말 유감입니다. 그러나이 코드는 완료하는 데 100 분 미만이 필요할 때 완벽하게 작동합니다.
해결법
-
==============================
1.이것은 Spark 1.6.0의 알려진 버그입니다 (https://issues.apache.org/jira/browse/SPARK-12609 참조). SparkR 코드를 재검토하면 Spark 1.4.0 이후 실제로 버그가 있음을 알 수 있습니다.
이것은 Spark 1.6.0의 알려진 버그입니다 (https://issues.apache.org/jira/browse/SPARK-12609 참조). SparkR 코드를 재검토하면 Spark 1.4.0 이후 실제로 버그가 있음을 알 수 있습니다.
그들이 수정을 발표 할 때까지, 빠르고 더러운 해결책은 타임 아웃을 증가시키는 것입니다. 이 문제에서 언급 한 것처럼 문제가되는 함수는 connectBackend입니다. 함수는 런타임에 assignInNamespace를 사용하여 패치 할 수 있습니다.
다음은 원래 함수를 검색 한 다음 시간 초과 값을 48 시간으로 늘리는 두 번째 함수로 래핑합니다. 원래 함수는 래퍼로 대체됩니다.
connectBackend.orig <- getFromNamespace('connectBackend', pos='package:SparkR') connectBackend.patched <- function(hostname, port, timeout = 3600*48) { connectBackend.orig(hostname, port, timeout) } assignInNamespace("connectBackend", value=connectBackend.patched, pos='package:SparkR')
SparkR 패키지를로드 한 후이 코드를 입력하십시오.
다른 해결책은 SparkR 코드에서 타임 아웃을 수정하고 다시 컴파일하는 것입니다. 컴파일 방법은 https://github.com/apache/spark/blob/branch-1.6/R/install-dev.sh를 참조하십시오.
-
==============================
2.Spark 2.1 이후에는 시간 초과를 제어하는 SPARKR_BACKEND_CONNECTION_TIMEOUT이라는 환경 변수가 있습니다. 그러나 기본값은 여전히 100 분으로 설정되어 있습니다. 예를 들어 드라이버에서 SPARKR_BACKEND_CONNECTION_TIMEOUT = 1209600을 사용하여 더 긴 작업을 실행할 수 있습니다.
Spark 2.1 이후에는 시간 초과를 제어하는 SPARKR_BACKEND_CONNECTION_TIMEOUT이라는 환경 변수가 있습니다. 그러나 기본값은 여전히 100 분으로 설정되어 있습니다. 예를 들어 드라이버에서 SPARKR_BACKEND_CONNECTION_TIMEOUT = 1209600을 사용하여 더 긴 작업을 실행할 수 있습니다.
spark-submit에서 --conf spark.yarn.appMasterEnv.SPARKR_BACKEND_CONNECTION_TIMEOUT = 1209600을 설정하면 트릭을 수행 할 수 있다고 생각했지만, 분명히 변수를 올바르게 설정하지 않은 것 같습니다. 그래서 현재의 해결 방법은 실행중인 R 스크립트에 이것을 포함시키는 것입니다.
Sys.setenv("SPARKR_BACKEND_CONNECTION_TIMEOUT" = 1209600)
from https://stackoverflow.com/questions/34584284/sparkr-job-100-minutes-timeout by cc-by-sa and MIT license
'HADOOP' 카테고리의 다른 글
[HADOOP] 천천히 변화하는 차원 - 하이브의 SCD1 및 SCD2 구현 (0) | 2019.07.30 |
---|---|
[HADOOP] aws를 사용하여 하이브 활동 자동화하기 (0) | 2019.07.30 |
[HADOOP] HBase 쓰기 : 성능, 배치 또는 put (List <Put>) 중 어떤 것이 더 낫습니까? (0) | 2019.07.30 |
[HADOOP] 스파크 하이브 : 누락 된 <spark-assembly * .jar> (0) | 2019.07.30 |
[HADOOP] 하나의 디렉토리에 두 개의 oozie workflow.xml 파일이있을 수 있습니까? (0) | 2019.07.30 |