[SCALA] "[삼백초] 후 시간 초과 선물 java.util.concurrent.TimeoutException"왜 실패에 가입합니까?
SCALA"[삼백초] 후 시간 초과 선물 java.util.concurrent.TimeoutException"왜 실패에 가입합니까?
나는 스파크 1.5을 사용하고 있습니다.
나는 형태의 두 dataframes 있습니다 :
scala> libriFirstTable50Plus3DF
res1: org.apache.spark.sql.DataFrame = [basket_id: string, family_id: int]
scala> linkPersonItemLessThan500DF
res2: org.apache.spark.sql.DataFrame = [person_id: int, family_id: int]
linkPersonItemLessThan500DF는 26,694,353 기록을 가지고있는 동안 libriFirstTable50Plus3DF는 766,151 기록을 가지고있다. 참고 나는이 두 나중에 가입하려는부터 내가 linkPersonItemLessThan500DF에 다시 분할 (번호)를 사용하고있다. 내가 가진 위의 코드를 다음입니다 :
val userTripletRankDF = linkPersonItemLessThan500DF
.join(libriFirstTable50Plus3DF, Seq("family_id"))
.take(20)
.foreach(println(_))
있는 나는이 출력을 얻고있다 :
16/12/13 15:07:10 INFO scheduler.TaskSetManager: Finished task 172.0 in stage 3.0 (TID 473) in 520 ms on mlhdd01.mondadori.it (199/200)
java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala: at scala.concurrent.Await$.result(package.scala:107)
at org.apache.spark.sql.execution.joins.BroadcastHashJoin.doExecute(BroadcastHashJoin.scala:110)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
at org.apache.spark.sql.execution.TungstenProject.doExecute(basicOperators.scala:86)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
at org.apache.spark.sql.execution.ConvertToSafe.doExecute(rowFormatConverters.scala:63)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:190)
at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:207)
at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1386)
at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1386)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1904)
at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1385)
at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1315)
at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1378)
at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:178)
at org.apache.spark.sql.DataFrame.show(DataFrame.scala:402)
at org.apache.spark.sql.DataFrame.show(DataFrame.scala:363)
at org.apache.spark.sql.DataFrame.show(DataFrame.scala:371)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:72)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:77)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:79)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:81)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:83)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:85)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:87)
at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:89)
at $iwC$$iwC$$iwC$$iwC.<init>(<console>:91)
at $iwC$$iwC$$iwC.<init>(<console>:93)
at $iwC$$iwC.<init>(<console>:95)
at $iwC.<init>(<console>:97)
at <init>(<console>:99)
at .<init>(<console>:103)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
나는 문제가 무엇인지 이해가 안 돼요. 그것은 대기 시간을 증가 단순하게인가? (가) 너무 많이 가입인가? 나는 더 많은 메모리가 필요하십니까? shufffling 집중인가? 누구든지 도움이 수 있습니까?
해결법
-
==============================
1.스파크가 방송 해시 가입 할 시도하고 DataFrames 중 하나가 그래서 많은 시간을 소모 전송, 매우 크기 때문에이 문제가 발생합니다.
스파크가 방송 해시 가입 할 시도하고 DataFrames 중 하나가 그래서 많은 시간을 소모 전송, 매우 크기 때문에이 문제가 발생합니다.
당신은 할 수 있습니다 :
다음과 같은 방식으로 스파크 컨텍스트를 빌드 할 때 PySpark에서는 설정을 설정할 수 있습니다 :
spark = SparkSession .builder .appName("Your App") .config("spark.sql.broadcastTimeout", "36000") .getOrCreate()
-
==============================
2.그냥 @T에서 매우 간결한 대답에 약간의 코드 컨텍스트를 추가 할 수 있습니다. Gawęda.
그냥 @T에서 매우 간결한 대답에 약간의 코드 컨텍스트를 추가 할 수 있습니다. Gawęda.
당신의 스파크 응용 프로그램에서 스파크 SQL 덜 소위 방송 임계 값보다 (10메가바이트 기본값) 우연히 "libriFirstTable50Plus3DF는 766,151 레코드가"때문에 가입에 대한 방송 해시 조인을 선택했다.
당신은 spark.sql.autoBroadcastJoinThreshold 구성 속성을 사용하여 방송 임계 값을 제어 할 수 있습니다.
당신은 스택 추적에 참여의 특정 유형을 찾을 수 있습니다 :
스파크 SQL에서 BroadcastHashJoin 물리 연산자는 (오히려 모든 작업과 그것의 사본을 운송보다) 스파크 집행에 작은 데이터 집합을 배포 할 방송 변수를 사용합니다.
당신은 실제 쿼리 계획을 검토 할 설명을 사용하면 쿼리 사용 BroadcastExchangeExec 물리 연산자를 알 것입니다. 당신은 작은 테이블 (시간 초과)을 방송에 대한 기본 기계를 볼 수있는 곳이다.
override protected[sql] def doExecuteBroadcast[T](): broadcast.Broadcast[T] = { ThreadUtils.awaitResult(relationFuture, timeout).asInstanceOf[broadcast.Broadcast[T]] }
doExecuteBroadcast는 SQL 그 다음 불꽃의 모든 물리 연산자가 필요한 경우 방송에 대한 허용 SparkPlan 계약의 일부입니다. BroadcastExchangeExec 그것을 필요로 발생합니다.
시간 초과 매개 변수는 당신을 위해 무엇을 찾고 있습니다.
private val timeout: Duration = { val timeoutValue = sqlContext.conf.broadcastTimeout if (timeoutValue < 0) { Duration.Inf } else { timeoutValue.seconds } }
당신은 방송 변수가 무기한 집행에 배송 할 때까지 기다리거나 정확히 spark.sql.broadcastTimeout 구성 속성입니다 sqlContext.conf.broadcastTimeout을 사용하는 의미 것 (음의 값을 사용)를 완전히 비활성화 할 수 있습니다 볼 수 있듯이. 기본값은 스택 트레이스에서 볼 수있는 5 * 60 초입니다 :
-
==============================
3.내 경우에는, 그것은 큰 dataframe에 브로드 캐스트에 의해 발생했다 :
내 경우에는, 그것은 큰 dataframe에 브로드 캐스트에 의해 발생했다 :
df.join(broadcast(largeDF))
그래서, 이전의 응답에 따라, 나는 방송을 제거하여 고정 :
df.join(largeDF)
-
==============================
4.spark.sql.broadcastTimeout 증가 또는 유지 () 둘 DataFrames 외에도
spark.sql.broadcastTimeout 증가 또는 유지 () 둘 DataFrames 외에도
당신은 시도 할 수 있습니다 :
spark.sql.autoBroadcastJoinThreshold로 설정함으로써 방송 1.disable -1
더 높은 값으로 설정하여 spark.driver.memory 스파크 드라이버 메모리 2.increase.
from https://stackoverflow.com/questions/41123846/why-does-join-fail-with-java-util-concurrent-timeoutexception-futures-timed-ou by cc-by-sa and MIT license
'SCALA' 카테고리의 다른 글
[SCALA] 어떻게 스칼라 / SBT / SLF4J 프로젝트에서 평민 로깅을 제외하려면? (0) | 2019.11.19 |
---|---|
[SCALA] 왜 기능은 [-A1, ..., + B]되지 매개 변수와 같은 슈퍼 타입을 허용 어떻습니까? (0) | 2019.11.19 |
[SCALA] 어떻게 혼합 된합니다 인스턴스에 대한 특성? (0) | 2019.11.19 |
[SCALA] reduceByKey : 어떻게 내부적으로 작동합니까? (0) | 2019.11.19 |
[SCALA] 스파크 - 응용 프로그램을 제출할 때 오류 "A 마스터 URL이 구성에서 설정해야합니다" (0) | 2019.11.19 |