복붙노트

[SCALA] 스파크에서 DataFrame 평가를 강제하는 방법

SCALA

스파크에서 DataFrame 평가를 강제하는 방법

때때로 (예를 들어, 테스트 및 벤치마킹을 위해) 나는 DataFrame에 정의 된 변환의 실행을 강제 할 수 있습니다. 모든 열이 실제로 계산되는 것을 보장하지 않는 수가 같은 조치를 호출 AFAIK는, 쇼는 모든 행의 부분 집합을 (아래 예 참조) 계산할 수

내 솔루션 df.write.saveAsTable를 사용하여 HDFS에 DataFrame를 작성하는 것입니다,하지만 테이블이 "클러 터"내 시스템은 내가 더 이상을 유지하고 싶지 않아요.

그래서 DataFrame의 평가를 유발하는 가장 좋은 방법은 무엇입니까?

편집하다:

스파크 개발자 목록 최근 논의도 있다는 것을 참고 : http://apache-spark-developers-list.1001551.n3.nabble.com/Will-count-always-trigger-an-evaluation-of-each- 행 td21018.html

I 작은 예 DataFrame 의지하지 않는 모든 것을 보여준다 평가했다 (점화 지역 및 1.6.3 스파크 마스터 =하여 시험을 [2]) :

val df = sc.parallelize(Seq(1)).toDF("id")
val myUDF = udf((i:Int) => {throw new RuntimeException;i})

df.withColumn("test",myUDF($"id")).count // runs fine
df.withColumn("test",myUDF($"id")).show() // gives Exception

쇼가 모든 행을 평가하지 않습니다 여기에, 예를 같은 논리를 사용하여 :

val df = sc.parallelize(1 to 10).toDF("id")
val myUDF = udf((i:Int) => {if(i==10) throw new RuntimeException;i})

df.withColumn("test",myUDF($"id")).show(5) // runs fine
df.withColumn("test",myUDF($"id")).show(10) // gives Exception

2 편집 : Eliasah의 경우 : 예외이 말한다 :

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 1 times, most recent failure: Lost task 0.0 in stage 6.0 (TID 6, localhost): java.lang.RuntimeException
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply$mcII$sp(<console>:68)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:68)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:68)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51)
    at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
.
.
.
.

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:212)
    at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165)
    at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174)
    at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1500)
    at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1500)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
    at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2087)
    at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1499)
    at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1506)
    at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1376)
    at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1375)
    at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2100)
    at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1375)
    at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1457)
    at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:170)
    at org.apache.spark.sql.DataFrame.show(DataFrame.scala:350)
    at org.apache.spark.sql.DataFrame.show(DataFrame.scala:311)
    at org.apache.spark.sql.DataFrame.show(DataFrame.scala:319)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:74)
.
.
.
.

해결법

  1. ==============================

    1.단순히 DataFrame에서 기본 RDD를 받고 그것에 행동은 당신이 찾고있는 무엇을 달성해야 트리거 같아요.

    단순히 DataFrame에서 기본 RDD를 받고 그것에 행동은 당신이 찾고있는 무엇을 달성해야 트리거 같아요.

    df.withColumn("test",myUDF($"id")).rdd.count // this gives proper exceptions
    
  2. ==============================

    2.그것은 조금 늦게, 그러나 여기 근본적인 이유 : 카운트가 RDD 및 DataFrame에서 동일하게 행동하지 않습니다.

    그것은 조금 늦게, 그러나 여기 근본적인 이유 : 카운트가 RDD 및 DataFrame에서 동일하게 행동하지 않습니다.

    어떤 경우에는 당신이 실제로 가지고있는 요소의 수를 알고 데이터를로드하는 데 필요하지 않기 때문에 (특히 당신의 경우를 참여 셔플 데이터가 없습니다 경우) DataFrames에 최적화있다. 따라서 DataFrame는 카운트가 모든 데이터를로드 할 수 없습니다라고하여 예외 던지기에 통과하지 때 구체화. 당신은 쉽게 자신의 DefaultSource과의 관계를 정의하여 실험을하고 (A DataFrame에 수를 호출하면 항상 상관없이 선택했다 얼마나 많은 열이없는 requiredColumns와 buildScan 방법에서 결국 없다는 것을 참조 org.apache.spark.sql를 볼 수 있습니다 .sources.interfaces는) 더 이해합니다. 실제로 매우 효율적인 최적화입니다 ;-)

    하지만 RDDs에서, 이러한 최적화가 없다 (즉, 하나는 항상 가능한 경우 DataFrames를 사용하려고해야하는 이유의). 따라서 RDD의 카운트 모두 혈통을 실행하고 모든 파티션을 구성하는 반복자 모든 크기의 합을 반환한다.

    dataframe.count를 호출하면 첫번째 설명에 간다, 그러나 당신이 빌드 귀하의 DataFrame의 밖으로 RDD처럼 호출 dataframe.rdd.count 두 번째로 들어갑니다. 호출이 dataframe.cache (). 당신이 결과를 캐시하는 불꽃을 필요에 따라 힘을 구체화 할 dataframe을 계산 (따라서이 모든 데이터를로드하고 변환 할 필요가) 있습니다. 그러나 데이터를 캐시의 부작용을 가지고 ...

  3. ==============================

    3.df.cache.count이 길을 가야하는 것 같습니다 :

    df.cache.count이 길을 가야하는 것 같습니다 :

    scala> val myUDF = udf((i:Int) => {if(i==1000) throw new RuntimeException;i})
    myUDF: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,IntegerType,Some(List(IntegerType)))
    
    scala> val df = sc.parallelize(1 to 1000).toDF("id")
    df: org.apache.spark.sql.DataFrame = [id: int]
    
    scala> df.withColumn("test",myUDF($"id")).show(10)
    [rdd_51_0]
    +---+----+
    | id|test|
    +---+----+
    |  1|   1|
    |  2|   2|
    |  3|   3|
    |  4|   4|
    |  5|   5|
    |  6|   6|
    |  7|   7|
    |  8|   8|
    |  9|   9|
    | 10|  10|
    +---+----+
    only showing top 10 rows
    
    scala> df.withColumn("test",myUDF($"id")).count
    res13: Long = 1000
    
    scala> df.withColumn("test",myUDF($"id")).cache.count
    org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (int) => int)
            at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    .
    .
    .
    Caused by: java.lang.RuntimeException
    

    출처

  4. ==============================

    4.내가) (df.save.parquet 사용하는 것을 선호합니다. 이것은 않습니다 추가 디스크 I / 당신이 추정하고 나중에 빼기,하지만 당신은 불꽃이 예상 각 단계를 수행하고 게으른 평가와 함께 당신을 속일하지 않았다 긍정적 수있는 시간 오.

    내가) (df.save.parquet 사용하는 것을 선호합니다. 이것은 않습니다 추가 디스크 I / 당신이 추정하고 나중에 빼기,하지만 당신은 불꽃이 예상 각 단계를 수행하고 게으른 평가와 함께 당신을 속일하지 않았다 긍정적 수있는 시간 오.

  5. from https://stackoverflow.com/questions/42714291/how-to-force-dataframe-evaluation-in-spark by cc-by-sa and MIT license