[SCALA] 나는 RDD의 아파치 스파크에서 반복 어떻게 (스칼라)
SCALA나는 RDD의 아파치 스파크에서 반복 어떻게 (스칼라)
나는이 문자열 [ "파일 이름", "내용"] 포함하는 배열의 무리와 함께 RDD을 채우기 위해 다음과 같은 명령을 사용합니다.
지금은 모든 파일 이름과 내용으로 뭔가를 그 발생의 모든을 반복하고 싶습니다.
val someRDD = sc.wholeTextFiles("hdfs://localhost:8020/user/cloudera/*")
그러나 나는이 작업을 수행하는 방법에 대한 문서를 찾을 수 없습니다.
그래서 내가 원하는 것은 이것이다 :
foreach occurrence-in-the-rdd{
//do stuff with the array found on loccation n of the RDD
}
해결법
-
==============================
1.스파크의 기본적인 작업지도 및 필터입니다.
스파크의 기본적인 작업지도 및 필터입니다.
val txtRDD = someRDD filter { case(id, content) => id.endsWith(".txt") }
txtRDD는 이제 확장 "이 .txt"를 가진 파일이 포함됩니다
그리고 당신은 말씀을 원하는 경우에 당신이 말할 수있는 해당 파일을 계산
//split the documents into words in one long list val words = txtRDD flatMap { case (id,text) => text.split("\\s+") } // give each word a count of 1 val wordT = words map (x => (x,1)) //sum up the counts for each word val wordCount = wordsT reduceByKey((a, b) => a + b)
예를 들어, 스탠포드 coreNLP 도구와 같은 라이브러리를 사용하여 엔티티 인식을 명명 수행하려는 경우 - 당신은 당신이 수행 할 필요가 비싼 초기화가있을 때 mapPartitions를 사용하고 싶습니다.
마스터지도, 필터, flatMap 및 감소, 당신은 불꽃을 마스터에 당신의 방법에 잘있다.
-
==============================
2.당신은 매개 변수로 기능을 수락 RDD에 다양한 방법을 호출합니다.
당신은 매개 변수로 기능을 수락 RDD에 다양한 방법을 호출합니다.
// set up an example -- an RDD of arrays val sparkConf = new SparkConf().setMaster("local").setAppName("Example") val sc = new SparkContext(sparkConf) val testData = Array(Array(1,2,3), Array(4,5,6,7,8)) val testRDD = sc.parallelize(testData, 2) // Print the RDD of arrays. testRDD.collect().foreach(a => println(a.size)) // Use map() to create an RDD with the array sizes. val countRDD = testRDD.map(a => a.size) // Print the elements of this new RDD. countRDD.collect().foreach(a => println(a)) // Use filter() to create an RDD with just the longer arrays. val bigRDD = testRDD.filter(a => a.size > 3) // Print each remaining array. bigRDD.collect().foreach(a => { a.foreach(e => print(e + " ")) println() }) }
당신이 후자의 RDD을 만들 수 있도록 쓰기 기능이 어떤 일정한 유형의 단일 입력으로 RDD 요소 및 반환 데이터를 받아 들일 것을 알 수 있습니다. bigRDD 여전히 RDD [배열 [지능] 인 예를 들어, countRDD는 RDD [지능]이다.
아마 수정 다른 데이터가 있지만,이 질문과 답변에 설명 된 이유에 대해 저항해야하는 foreach 문을 작성하는 어떤 점에서 유혹한다.
편집 : 큰 RDDs를 인쇄하지 마십시오
몇몇 독자는 위의 예에서와 같이 결과를보고 () 수집 사용 ()와에 println에 대해 질문했다. 당신이 점화 REPL 같은 대화 형 모드에서 실행하는 경우 물론,이 경우에만 작동합니다 (읽기 평가 후면 인쇄 루프를.) 그것은 질서 인쇄를위한 순차적 인 배열을 얻기 위해 RDD ()를 수집 호출하는 것이 가장 좋습니다. 그러나 수집 () 너무 많은 데이터를 다시 가져올 수 있으며, 어떤 경우에 너무 많이 인쇄 할 수 있습니다. 여기에 그들이 큰이라면 RDDs에 대한 통찰력을 얻을 수있는 몇 가지 다른 방법은 다음과 같습니다 :
-
==============================
3.나는 파티션 매핑 기능을 만드는 사용을 시도 할 것입니다. RDD 전체 데이터 세트가 루프에서 처리 할 수있는 방법을 보여주는 아래의 코드는 그 각각의 입력은 완전히 동일한 기능을 통해 진행한다. 나는 스칼라에 대한 지식이없는 두려워, 그래서이 제공하는 모든 자바 코드입니다. 그러나, 스칼라로 번역하기가 매우 어렵 안된다.
나는 파티션 매핑 기능을 만드는 사용을 시도 할 것입니다. RDD 전체 데이터 세트가 루프에서 처리 할 수있는 방법을 보여주는 아래의 코드는 그 각각의 입력은 완전히 동일한 기능을 통해 진행한다. 나는 스칼라에 대한 지식이없는 두려워, 그래서이 제공하는 모든 자바 코드입니다. 그러나, 스칼라로 번역하기가 매우 어렵 안된다.
JavaRDD<String> res = file.mapPartitions(new FlatMapFunction <Iterator<String> ,String>(){ @Override public Iterable<String> call(Iterator <String> t) throws Exception { ArrayList<String[]> tmpRes = new ArrayList <>(); String[] fillData = new String[2]; fillData[0] = "filename"; fillData[1] = "content"; while(t.hasNext()){ tmpRes.add(fillData); } return Arrays.asList(tmpRes); } }).cache();
-
==============================
4.무엇 wholeTextFiles 반환하는 것은 쌍 RDD입니다 :
무엇 wholeTextFiles 반환하는 것은 쌍 RDD입니다 :
다음은 다음 로컬 경로에서 파일을 읽을 때마다 파일 이름과 내용을 인쇄의 예입니다.
val conf = new SparkConf().setAppName("scala-test").setMaster("local") val sc = new SparkContext(conf) sc.wholeTextFiles("file:///Users/leon/Documents/test/") .collect .foreach(t => println(t._1 + ":" + t._2));
결과:
file:/Users/leon/Documents/test/1.txt:{"name":"tom","age":12} file:/Users/leon/Documents/test/2.txt:{"name":"john","age":22} file:/Users/leon/Documents/test/3.txt:{"name":"leon","age":18}
또는 먼저 RDD로 변환 쌍 RDD
sc.wholeTextFiles("file:///Users/leon/Documents/test/") .map(t => t._2) .collect .foreach { x => println(x)}
결과:
{"name":"tom","age":12} {"name":"john","age":22} {"name":"leon","age":18}
그리고 wholeTextFiles 작은 파일에 대한 더 준수라고 생각합니다.
-
==============================
5.
for (element <- YourRDD) { // do what you want with element in each iteration, and if you want the index of element, simply use a counter variable in this loop beginning from 0 println (element._1) // this will print all filenames }
from https://stackoverflow.com/questions/25914789/how-do-i-iterate-rdds-in-apache-spark-scala by cc-by-sa and MIT license
'SCALA' 카테고리의 다른 글
[SCALA] 스칼라 일반적인 방법 - T 없음 ClassTag 가능 (0) | 2019.11.08 |
---|---|
[SCALA] 어떻게 스칼라 가변 인자 방법으로 스칼라 배열을 전달하는? (0) | 2019.11.08 |
[SCALA] 스칼라 2.8 CanBuildFrom (0) | 2019.11.08 |
[SCALA] 스칼라 함수 리터럴은 무엇인가? (0) | 2019.11.08 |
[SCALA] 무엇 * 너무 *의 경우 클래스 상속 문제? (0) | 2019.11.08 |