[SCALA] 스칼라 스파크 : 분할 수집 여러 RDD로?
SCALA스칼라 스파크 : 분할 수집 여러 RDD로?
일부 creteria에 따라 여러 RDDs로 컬렉션을 분할 할 수있는 스파크 기능이 있습니까? 이러한 기능이 과도 itteration를 방지 할 수있다. 예를 들면 :
def main(args: Array[String]) {
val logFile = "file.txt"
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val logData = sc.textFile(logFile, 2).cache()
val lineAs = logData.filter(line => line.contains("a")).saveAsTextFile("linesA.txt")
val lineBs = logData.filter(line => line.contains("b")).saveAsTextFile("linesB.txt")
}
이 예에서 나는 두 별도의 파일에 쓰기 결과를 두 번 반복 처리 'logData`에 있습니다 :
val lineAs = logData.filter(line => line.contains("a")).saveAsTextFile("linesA.txt")
val lineBs = logData.filter(line => line.contains("b")).saveAsTextFile("linesB.txt")
그것은이 같은 뭔가를 대신 좋은 일 것입니다 :
val resultMap = logData.map(line => if line.contains("a") ("a", line) else if line.contains("b") ("b", line) else (" - ", line)
resultMap.writeByKey("a", "linesA.txt")
resultMap.writeByKey("b", "linesB.txt")
그러한 일이?
해결법
-
==============================
1.다음과 같은 질문에서보세요.
다음과 같은 질문에서보세요.
하나 스파크 작업 - 키 스파크에 의한 다중 출력에 쓰기
당신은 다음과 같은 기능을 가진 RDD을 flatMap 다음 키에 GROUPBY 할 수 있습니다.
def multiFilter(words:List[String], line:String) = for { word <- words; if line.contains(word) } yield { (word,line) } val filterWords = List("a","b") val filteredRDD = logData.flatMap( line => multiFilter(filterWords, line) ) val groupedRDD = filteredRDD.groupBy(_._1)
GROUPBY 작업 중 하나가 셔플을 포함하기 때문에하지만 당신의 입력 RDD의 크기에 따라 당신은 또는 성능 향상을 볼 수 없습니다.
당신이 입력 RDD을 캐시 할 수 있습니다 당신이 생각하는, 따라서 실행하는 여러 필터 작업 비용으로하지 않을 수 있습니다 클러스터 반면에 당신은 당신의 불꽃에 메모리가 부족합니다.
-
==============================
2.어쩌면이 같은 작동합니다 :
어쩌면이 같은 작동합니다 :
def singlePassMultiFilter[T]( rdd: RDD[T], f1: T => Boolean, f2: T => Boolean, level: StorageLevel = StorageLevel.MEMORY_ONLY ): (RDD[T], RDD[T], Boolean => Unit) = { val tempRDD = rdd mapPartitions { iter => val abuf1 = ArrayBuffer.empty[T] val abuf2 = ArrayBuffer.empty[T] for (x <- iter) { if (f1(x)) abuf1 += x if (f2(x)) abuf2 += x } Iterator.single((abuf1, abuf2)) } tempRDD.persist(level) val rdd1 = tempRDD.flatMap(_._1) val rdd2 = tempRDD.flatMap(_._2) (rdd1, rdd2, (blocking: Boolean) => tempRDD.unpersist(blocking)) }
rdd1 (RESP. rdd2) 호출 작업이 tempRDD가 발생할 수 있음을 참고 계산 및 지속된다. 이것은 내가 생각이다 rdd1 및 rdd2의 정의에 flatMap의 오버 헤드 때문에 rdd2 (RESP. rdd1)을 계산 꽤 무시할 될 것 실제적으로 동일합니다.
당신과 같이 singlePassMultiFitler을 사용합니다 :
val (rdd1, rdd2, cleanUp) = singlePassMultiFilter(rdd, f1, f2) rdd1.persist() //I'm going to need `rdd1` more later... println(rdd1.count) println(rdd2.count) cleanUp(true) //I'm done with `rdd2` and `rdd1` has been persisted so free stuff up... println(rdd1.distinct.count)
분명이 필터의 임의의 수, 필터의 컬렉션 등으로 확장 할 수
from https://stackoverflow.com/questions/27231524/scala-spark-split-collection-into-several-rdd by cc-by-sa and MIT license
'SCALA' 카테고리의 다른 글
[SCALA] 우편 번호에 여러 시퀀스 (0) | 2019.11.23 |
---|---|
[SCALA] 슬라이스 배열 방법 및 열의 합 요소? (0) | 2019.11.23 |
[SCALA] 스칼라는 : 어떻게 어떤 경우 클래스의 추상 복사 가능한 슈퍼 클래스를 정의? (0) | 2019.11.23 |
[SCALA] 어떻게 스칼라에서 일반적인 방법으로 형질의 인스턴스를 만들려면 어떻게해야합니까? (0) | 2019.11.23 |
[SCALA] ClassTag 기초하여 패턴 매칭이 실패 프리미티브 (0) | 2019.11.23 |