복붙노트

[SCALA] 스칼라 스파크 : 분할 수집 여러 RDD로?

SCALA

스칼라 스파크 : 분할 수집 여러 RDD로?

일부 creteria에 따라 여러 RDDs로 컬렉션을 분할 할 수있는 스파크 기능이 있습니까? 이러한 기능이 과도 itteration를 방지 할 수있다. 예를 들면 :

def main(args: Array[String]) {
    val logFile = "file.txt" 
    val conf = new SparkConf().setAppName("Simple Application")
    val sc = new SparkContext(conf)
    val logData = sc.textFile(logFile, 2).cache()
    val lineAs = logData.filter(line => line.contains("a")).saveAsTextFile("linesA.txt")
    val lineBs = logData.filter(line => line.contains("b")).saveAsTextFile("linesB.txt")
  }

이 예에서 나는 두 별도의 파일에 쓰기 결과를 두 번 반복 처리 'logData`에 있습니다 :

    val lineAs = logData.filter(line => line.contains("a")).saveAsTextFile("linesA.txt")
    val lineBs = logData.filter(line => line.contains("b")).saveAsTextFile("linesB.txt")

그것은이 같은 뭔가를 대신 좋은 일 것입니다 :

    val resultMap = logData.map(line => if line.contains("a") ("a", line) else if line.contains("b") ("b", line) else (" - ", line)
    resultMap.writeByKey("a", "linesA.txt") 
    resultMap.writeByKey("b", "linesB.txt")

그러한 일이?

해결법

  1. ==============================

    1.다음과 같은 질문에서보세요.

    다음과 같은 질문에서보세요.

    하나 스파크 작업 - 키 스파크에 의한 다중 출력에 쓰기

    당신은 다음과 같은 기능을 가진 RDD을 flatMap 다음 키에 GROUPBY 할 수 있습니다.

    def multiFilter(words:List[String], line:String) = for { word <- words; if line.contains(word) } yield { (word,line) }
    val filterWords = List("a","b")
    val filteredRDD = logData.flatMap( line => multiFilter(filterWords, line) ) 
    val groupedRDD = filteredRDD.groupBy(_._1) 
    

    GROUPBY 작업 중 하나가 셔플을 포함하기 때문에하지만 당신의 입력 RDD의 크기에 따라 당신은 또는 성능 향상을 볼 수 없습니다.

    당신이 입력 RDD을 캐시 할 수 있습니다 당신이 생각하는, 따라서 실행하는 여러 필터 작업 비용으로하지 않을 수 있습니다 클러스터 반면에 당신은 당신의 불꽃에 메모리가 부족합니다.

  2. ==============================

    2.어쩌면이 같은 작동합니다 :

    어쩌면이 같은 작동합니다 :

    def singlePassMultiFilter[T](
          rdd: RDD[T],
          f1: T => Boolean,
          f2: T => Boolean,
          level: StorageLevel = StorageLevel.MEMORY_ONLY
      ): (RDD[T], RDD[T], Boolean => Unit) = {
      val tempRDD = rdd mapPartitions { iter =>
        val abuf1 = ArrayBuffer.empty[T]
        val abuf2 = ArrayBuffer.empty[T]
        for (x <- iter) {
          if (f1(x)) abuf1 += x
          if (f2(x)) abuf2 += x
        }
        Iterator.single((abuf1, abuf2))
      }
      tempRDD.persist(level)
      val rdd1 = tempRDD.flatMap(_._1)
      val rdd2 = tempRDD.flatMap(_._2)
      (rdd1, rdd2, (blocking: Boolean) => tempRDD.unpersist(blocking))
    }
    

    rdd1 (RESP. rdd2) 호출 작업이 tempRDD가 발생할 수 있음을 참고 계산 및 지속된다. 이것은 내가 생각이다 rdd1 및 rdd2의 정의에 flatMap의 오버 헤드 때문에 rdd2 (RESP. rdd1)을 계산 꽤 무시할 될 것 실제적으로 동일합니다.

    당신과 같이 singlePassMultiFitler을 사용합니다 :

    val (rdd1, rdd2, cleanUp) = singlePassMultiFilter(rdd, f1, f2)
    rdd1.persist()    //I'm going to need `rdd1` more later...
    println(rdd1.count)  
    println(rdd2.count) 
    cleanUp(true)     //I'm done with `rdd2` and `rdd1` has been persisted so free stuff up...
    println(rdd1.distinct.count)
    

    분명이 필터의 임의의 수, 필터의 컬렉션 등으로 확장 할 수

  3. from https://stackoverflow.com/questions/27231524/scala-spark-split-collection-into-several-rdd by cc-by-sa and MIT license