복붙노트

[SCALA] 나는 RDD의 아파치 스파크에서 반복 어떻게 (스칼라)

SCALA

나는 RDD의 아파치 스파크에서 반복 어떻게 (스칼라)

나는이 문자열 [ "파일 이름", "내용"] 포함하는 배열의 무리와 함께 RDD을 채우기 위해 다음과 같은 명령을 사용합니다.

지금은 모든 파일 이름과 내용으로 뭔가를 그 발생의 모든을 반복하고 싶습니다.

val someRDD = sc.wholeTextFiles("hdfs://localhost:8020/user/cloudera/*")

그러나 나는이 작업을 수행하는 방법에 대한 문서를 찾을 수 없습니다.

그래서 내가 원하는 것은 이것이다 :

foreach occurrence-in-the-rdd{
   //do stuff with the array found on loccation n of the RDD
} 

해결법

  1. ==============================

    1.스파크의 기본적인 작업지도 및 필터입니다.

    스파크의 기본적인 작업지도 및 필터입니다.

    val txtRDD = someRDD filter { case(id, content) => id.endsWith(".txt") }
    

    txtRDD는 이제 확장 "이 .txt"를 가진 파일이 포함됩니다

    그리고 당신은 말씀을 원하는 경우에 당신이 말할 수있는 해당 파일을 계산

    //split the documents into words in one long list
    val words = txtRDD flatMap { case (id,text) => text.split("\\s+") }
    // give each word a count of 1
    val wordT = words map (x => (x,1))  
    //sum up the counts for each word
    val wordCount = wordsT reduceByKey((a, b) => a + b)
    

    예를 들어, 스탠포드 coreNLP 도구와 같은 라이브러리를 사용하여 엔티티 인식을 명명 수행하려는 경우 - 당신은 당신이 수행 할 필요가 비싼 초기화가있을 때 mapPartitions를 사용하고 싶습니다.

    마스터지도, 필터, flatMap 및 감소, 당신은 불꽃을 마스터에 당신의 방법에 잘있다.

  2. ==============================

    2.당신은 매개 변수로 기능을 수락 RDD에 다양한 방법을 호출합니다.

    당신은 매개 변수로 기능을 수락 RDD에 다양한 방법을 호출합니다.

    // set up an example -- an RDD of arrays
    val sparkConf = new SparkConf().setMaster("local").setAppName("Example")
    val sc = new SparkContext(sparkConf)
    val testData = Array(Array(1,2,3), Array(4,5,6,7,8))
    val testRDD = sc.parallelize(testData, 2)
    
    // Print the RDD of arrays.
    testRDD.collect().foreach(a => println(a.size))
    
    // Use map() to create an RDD with the array sizes.
    val countRDD = testRDD.map(a => a.size)
    
    // Print the elements of this new RDD.
    countRDD.collect().foreach(a => println(a))
    
    // Use filter() to create an RDD with just the longer arrays.
    val bigRDD = testRDD.filter(a => a.size > 3)
    
    // Print each remaining array.
    bigRDD.collect().foreach(a => {
        a.foreach(e => print(e + " "))
        println()
      })
    }
    

    당신이 후자의 RDD을 만들 수 있도록 쓰기 기능이 어떤 일정한 유형의 단일 입력으로 RDD 요소 및 반환 데이터를 받아 들일 것을 알 수 있습니다. bigRDD 여전히 RDD [배열 [지능] 인 예를 들어, countRDD는 RDD [지능]이다.

    아마 수정 다른 데이터가 있지만,이 질문과 답변에 설명 된 이유에 대해 저항해야하는 foreach 문을 작성하는 어떤 점에서 유혹한다.

    편집 : 큰 RDDs를 인쇄하지 마십시오

    몇몇 독자는 위의 예에서와 같이 결과를보고 () 수집 사용 ()와에 println에 대해 질문했다. 당신이 점화 REPL 같은 대화 형 모드에서 실행하는 경우 물론,이 경우에만 작동합니다 (읽기 평가 후면 인쇄 루프를.) 그것은 질서 인쇄를위한 순차적 인 배열을 얻기 위해 RDD ()를 수집 호출하는 것이 가장 좋습니다. 그러나 수집 () 너무 많은 데이터를 다시 가져올 수 있으며, 어떤 경우에 너무 많이 인쇄 할 수 있습니다. 여기에 그들이 큰이라면 RDDs에 대한 통찰력을 얻을 수있는 몇 가지 다른 방법은 다음과 같습니다 :

  3. ==============================

    3.나는 파티션 매핑 기능을 만드는 사용을 시도 할 것입니다. RDD 전체 데이터 세트가 루프에서 처리 할 수있는 방법을 보여주는 아래의 코드는 그 각각의 입력은 완전히 동일한 기능을 통해 진행한다. 나는 스칼라에 대한 지식이없는 두려워, 그래서이 제공하는 모든 자바 코드입니다. 그러나, 스칼라로 번역하기가 매우 어렵 안된다.

    나는 파티션 매핑 기능을 만드는 사용을 시도 할 것입니다. RDD 전체 데이터 세트가 루프에서 처리 할 수있는 방법을 보여주는 아래의 코드는 그 각각의 입력은 완전히 동일한 기능을 통해 진행한다. 나는 스칼라에 대한 지식이없는 두려워, 그래서이 제공하는 모든 자바 코드입니다. 그러나, 스칼라로 번역하기가 매우 어렵 안된다.

    JavaRDD<String> res = file.mapPartitions(new FlatMapFunction <Iterator<String> ,String>(){ 
          @Override
          public Iterable<String> call(Iterator <String> t) throws Exception {  
    
              ArrayList<String[]> tmpRes = new ArrayList <>();
              String[] fillData = new String[2];
    
              fillData[0] = "filename";
              fillData[1] = "content";
    
              while(t.hasNext()){
                   tmpRes.add(fillData);  
              }
    
              return Arrays.asList(tmpRes);
          }
    
    }).cache();
    
  4. ==============================

    4.무엇 wholeTextFiles 반환하는 것은 쌍 RDD입니다 :

    무엇 wholeTextFiles 반환하는 것은 쌍 RDD입니다 :

    다음은 다음 로컬 경로에서 파일을 읽을 때마다 파일 이름과 내용을 인쇄의 예입니다.

    val conf = new SparkConf().setAppName("scala-test").setMaster("local")
    val sc = new SparkContext(conf)
    sc.wholeTextFiles("file:///Users/leon/Documents/test/")
      .collect
      .foreach(t => println(t._1 + ":" + t._2));
    

    결과:

    file:/Users/leon/Documents/test/1.txt:{"name":"tom","age":12}
    
    file:/Users/leon/Documents/test/2.txt:{"name":"john","age":22}
    
    file:/Users/leon/Documents/test/3.txt:{"name":"leon","age":18}
    

    또는 먼저 RDD로 변환 쌍 RDD

    sc.wholeTextFiles("file:///Users/leon/Documents/test/")
      .map(t => t._2)
      .collect
      .foreach { x => println(x)}
    

    결과:

    {"name":"tom","age":12}
    
    {"name":"john","age":22}
    
    {"name":"leon","age":18}
    

    그리고 wholeTextFiles 작은 파일에 대한 더 준수라고 생각합니다.

  5. ==============================

    5.

    for (element <- YourRDD)
    {
         // do what you want with element in each iteration, and if you want the index of element, simply use a counter variable in this loop beginning from 0 
         println (element._1) // this will print all filenames
    }
    
  6. from https://stackoverflow.com/questions/25914789/how-do-i-iterate-rdds-in-apache-spark-scala by cc-by-sa and MIT license