[SCALA] 때문에 긴 RDD 리니지에 유래
SCALA때문에 긴 RDD 리니지에 유래
나는 HDFS에 작은 파일의 수천이있다. (수천에 다시입니다) 파일의 약간 작은 부분 집합의 fileList 처리 할 필요가 filepaths 목록에 포함을 처리해야합니다.
// fileList == list of filepaths in HDFS
var masterRDD: org.apache.spark.rdd.RDD[(String, String)] = sparkContext.emptyRDD
for (i <- 0 to fileList.size() - 1) {
val filePath = fileStatus.get(i)
val fileRDD = sparkContext.textFile(filePath)
val sampleRDD = fileRDD.filter(line => line.startsWith("#####")).map(line => (filePath, line))
masterRDD = masterRDD.union(sampleRDD)
}
masterRDD.first()
일단 루프 밖으로 유래 오류 조치 결과를 수행하기 때문에 RDD 긴 혈통 //
Exception in thread "main" java.lang.StackOverflowError
at scala.runtime.AbstractFunction1.<init>(AbstractFunction1.scala:12)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.<init>(UnionRDD.scala:66)
at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
=====================================================================
=====================================================================
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
해결법
-
==============================
1.일반적으로 당신은 긴 계통을 깰 체크 포인트를 사용할 수 있습니다. 이 몇 가지 더 많거나 적은 유사한 작업을해야합니다 :
일반적으로 당신은 긴 계통을 깰 체크 포인트를 사용할 수 있습니다. 이 몇 가지 더 많거나 적은 유사한 작업을해야합니다 :
import org.apache.spark.rdd.RDD import scala.reflect.ClassTag val checkpointInterval: Int = ??? def loadAndFilter(path: String) = sc.textFile(path) .filter(_.startsWith("#####")) .map((path, _)) def mergeWithLocalCheckpoint[T: ClassTag](interval: Int) (acc: RDD[T], xi: (RDD[T], Int)) = { if(xi._2 % interval == 0 & xi._2 > 0) xi._1.union(acc).localCheckpoint else xi._1.union(acc) } val zero: RDD[(String, String)] = sc.emptyRDD[(String, String)] fileList.map(loadAndFilter).zipWithIndex .foldLeft(zero)(mergeWithLocalCheckpoint(checkpointInterval))
이 특정 상황에서 훨씬 더 간단한 해결책은 SparkContext.union 방법을 사용하는 것 :
val masterRDD = sc.union( fileList.map(path => sc.textFile(path) .filter(_.startsWith("#####")) .map((path, _))) )
당신은 루프에 의해 생성 된 DAG를 살펴 때 이러한 방법의 차이가 분명해야 / 감소 :
단일 노동 조합 :
물론 파일은 flatMap와 wholeTextFiles을 결합하여 한 번에 모든 파일을 읽을 수있는 작은 경우 :
sc.wholeTextFiles(fileList.mkString(",")) .flatMap{case (path, text) => text.split("\n").filter(_.startsWith("#####")).map((path, _))}
from https://stackoverflow.com/questions/34461804/stackoverflow-due-to-long-rdd-lineage by cc-by-sa and MIT license
'SCALA' 카테고리의 다른 글
[SCALA] 어떻게 스파크 SQL에서 UDF를에 추가 매개 변수를 전달할 수 있습니다? (0) | 2019.11.02 |
---|---|
[SCALA] 스칼라 2.10에 반사를 통해 유형 매개 변수를 찾기? (0) | 2019.11.02 |
[SCALA] 규모 :지도 병합 (0) | 2019.11.02 |
[SCALA] 어떻게 벡터의 열을 합계를 사용자 정의 집계 함수를 정의? (0) | 2019.11.02 |
[SCALA] 새로운 스칼라 반사 API와 동반자 객체의 인스턴스를 가져옵니다 (0) | 2019.11.02 |