[SCALA] 스파크에서 RDD에서 이웃 요소에서 작동
SCALA스파크에서 RDD에서 이웃 요소에서 작동
내가 컬렉션을 가지고 :
List(1, 3,-1, 0, 2, -4, 6)
그것은으로 분류되어 쉽게 만들 수 :
List(-4, -1, 0, 1, 2, 3, 6)
: 등이 추천에 0 - 3 - 3 - 2, 2 - 1,1- 그럼 연산 (6)에 의해 새로운 집합을 생성 할 수
for(i <- 0 to list.length -2) yield {
list(i + 1) - list(i)
}
와 벡터를 얻을 :
Vector(3, 1, 1, 1, 1, 3)
즉, 나는 다음 요소를 뺀 현재 요소를 만들고 싶어.
하지만 어떻게 스파크에 RDD에서 이것을 구현하는 방법?
나는 컬렉션 알고 :
List(-4, -1, 0, 1, 2, 3, 6)
컬렉션의 일부 파티션이있을 것이다, 각 파티션이 나는 각 파티션 함께 각 파티션 및 수집 결과에 유사한 작업을 수행 할 수 주문한?
해결법
-
==============================
1.가장 효율적인 해결 방법 슬라이딩 사용하는 것이다 :
가장 효율적인 해결 방법 슬라이딩 사용하는 것이다 :
import org.apache.spark.mllib.rdd.RDDFunctions._ val rdd = sc.parallelize(Seq(1, 3,-1, 0, 2, -4, 6)) .sortBy(identity) .sliding(2) .map{case Array(x, y) => y - x}
-
==============================
2.당신이 뭔가를 가지고 가정
당신이 뭔가를 가지고 가정
val seq = sc.parallelize(List(1, 3, -1, 0, 2, -4, 6)).sortBy(identity)
톤 토레스 같은 키를 제안하자 인덱스에 첫 컬렉션을 만들 수
val original = seq.zipWithIndex.map(_.swap)
이제 우리는 하나 개의 요소에 의해 이동 모음을 구축 할 수 있습니다.
val shifted = original.map { case (idx, v) => (idx - 1, v) }.filter(_._1 >= 0)
우리는 필요의 차이는 인덱스 내림차순으로 정렬 계산할 수 다음으로
val diffs = original.join(shifted) .sortBy(_._1, ascending = false) .map { case (idx, (v1, v2)) => v2 - v1 }
그래서
println(diffs.collect.toSeq)
쇼
WrappedArray(3, 1, 1, 1, 1, 3)
반전이 중요하지 않은 경우 참고는 sortBy 단계를 건너 뛸 수있다.
또한 지역의 수집이 훨씬 더 간단 같은 계산 할 수 있습니다 :
val elems = List(1, 3, -1, 0, 2, -4, 6).sorted (elems.tail, elems).zipped.map(_ - _).reverse
그러나 RDD 경우 압축 방법은 각 컬렉션은 각 파티션에 대해 동일한 요소 수를 포함한다 필요하다. 이 같은 꼬리를 구현하는 것이 경우에 따라서
val tail = seq.zipWithIndex().filter(_._2 > 0).map(_._1)
tail.zip (서열) 것하지 작업을 모두 수집이 각 파티션에 대한 요소의 동일한 수를 필요로하고 우리가 이전의 파티션으로 이동해야 각 파티션에 대해 하나 개의 요소가 있기 때문이다.
from https://stackoverflow.com/questions/34146907/operate-on-neighbor-elements-in-rdd-in-spark by cc-by-sa and MIT license
'SCALA' 카테고리의 다른 글
[SCALA] w /를 unboundid LDAP의 SDK 스칼라에서 암호 변경하려고 할 때 어떻게 "WILL_NOT_PERFORM"MS의 AD 응답을 해결하는 방법은 무엇입니까? (0) | 2019.11.13 |
---|---|
[SCALA] 왜 스칼라 지원이 변수를 그림자는 무엇입니까? [닫은] (0) | 2019.11.13 |
[SCALA] 병합은 전체 스테이지의 평행도를 감소 (스파크) (0) | 2019.11.13 |
[SCALA] 스칼라 F-경계 유형 설명 (0) | 2019.11.13 |
[SCALA] tar.gz의 압축 여러 파일에 읽기가 불꽃에 보관 [중복] (0) | 2019.11.13 |