복붙노트

[SCALA] 때문에 긴 RDD 리니지에 유래

SCALA

때문에 긴 RDD 리니지에 유래

나는 HDFS에 작은 파일의 수천이있다. (수천에 다시입니다) 파일의 약간 작은 부분 집합의 fileList 처리 할 필요가 filepaths 목록에 포함을 처리해야합니다.

// fileList == list of filepaths in HDFS

var masterRDD: org.apache.spark.rdd.RDD[(String, String)] = sparkContext.emptyRDD

for (i <- 0 to fileList.size() - 1) {

val filePath = fileStatus.get(i)
val fileRDD = sparkContext.textFile(filePath)
val sampleRDD = fileRDD.filter(line => line.startsWith("#####")).map(line => (filePath, line)) 

masterRDD = masterRDD.union(sampleRDD)

}

masterRDD.first()

일단 루프 밖으로 유래 오류 조치 결과를 수행하기 때문에 RDD 긴 혈통 //

Exception in thread "main" java.lang.StackOverflowError
    at scala.runtime.AbstractFunction1.<init>(AbstractFunction1.scala:12)
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.<init>(UnionRDD.scala:66)
    at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
    =====================================================================
    =====================================================================
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)

해결법

  1. ==============================

    1.일반적으로 당신은 긴 계통을 깰 체크 포인트를 사용할 수 있습니다. 이 몇 가지 더 많거나 적은 유사한 작업을해야합니다 :

    일반적으로 당신은 긴 계통을 깰 체크 포인트를 사용할 수 있습니다. 이 몇 가지 더 많거나 적은 유사한 작업을해야합니다 :

    import org.apache.spark.rdd.RDD
    import scala.reflect.ClassTag
    
    val checkpointInterval: Int = ???
    
    def loadAndFilter(path: String) = sc.textFile(path)
      .filter(_.startsWith("#####"))
      .map((path, _))
    
    def mergeWithLocalCheckpoint[T: ClassTag](interval: Int)
      (acc: RDD[T], xi: (RDD[T], Int)) = {
        if(xi._2 % interval == 0 & xi._2 > 0) xi._1.union(acc).localCheckpoint
        else xi._1.union(acc)
      }
    
    val zero: RDD[(String, String)] = sc.emptyRDD[(String, String)]
    fileList.map(loadAndFilter).zipWithIndex
      .foldLeft(zero)(mergeWithLocalCheckpoint(checkpointInterval))
    

    이 특정 상황에서 훨씬 더 간단한 해결책은 SparkContext.union 방법을 사용하는 것 :

    val masterRDD = sc.union(
      fileList.map(path => sc.textFile(path)
        .filter(_.startsWith("#####"))
        .map((path, _))) 
    )
    

    당신은 루프에 의해 생성 된 DAG를 살펴 때 이러한 방법의 차이가 분명해야 / 감소 :

    단일 노동 조합 :

    물론 파일은 flatMap와 wholeTextFiles을 결합하여 한 번에 모든 파일을 읽을 수있는 작은 경우 :

    sc.wholeTextFiles(fileList.mkString(","))
      .flatMap{case (path, text) =>  
        text.split("\n").filter(_.startsWith("#####")).map((path, _))}
    
  2. from https://stackoverflow.com/questions/34461804/stackoverflow-due-to-long-rdd-lineage by cc-by-sa and MIT license