[HADOOP] 각 파티션의 요소 수가 같은 동일한 크기의 파티션으로 구성된 Spark RDD의 사용자 정의 파티션을 정의하는 방법은 무엇입니까?
HADOOP각 파티션의 요소 수가 같은 동일한 크기의 파티션으로 구성된 Spark RDD의 사용자 정의 파티션을 정의하는 방법은 무엇입니까?
나는 스파크가 처음이다. 나는 [RDD] 요소의 큰 데이터 집합을 가지고 있으며 그것을 요소의 순서를 유지하면서 똑같은 크기의 두 개의 분할 영역으로 나누고 싶습니다. 나는 RangePartitioner를 사용하여
var data = partitionedFile.partitionBy(new RangePartitioner(2, partitionedFile))
이는 대략 동일하지만 크기가 동일하지 않은 요소의 유지 순서를 대략 나눌 수 있기 때문에 만족스러운 결과를 제공하지 못합니다. 예를 들어, 64 개의 요소가 있다면, 우리는 Rangepartitioner는 31 개의 요소와 33 개의 요소로 나뉩니다.
나는 처음 32 개 요소를 반으로, 나머지 절반은 32 개 요소를 두 번째로 포함하는 분할 자 (partitioner)가 필요합니다. 사용자 정의 파 티셔 터를 사용하여 요소의 순서를 유지하면서 동일한 크기의 두 반쪽을 사용하는 방법을 제안하여 도움을받을 수 있습니까?
해결법
-
==============================
1.파티션은 파티션에 키를 할당하여 작동합니다. 그러한 분배자를 만들기 위해 키 배포에 대한 사전 지식이 필요하거나 모든 키를 살펴야합니다. 이것이 스파크가 당신에게 제공하지 않는 이유입니다.
파티션은 파티션에 키를 할당하여 작동합니다. 그러한 분배자를 만들기 위해 키 배포에 대한 사전 지식이 필요하거나 모든 키를 살펴야합니다. 이것이 스파크가 당신에게 제공하지 않는 이유입니다.
일반적으로 그런 분할자를 필요로하지 않습니다. 사실 같은 크기의 파티션이 필요한 유스 케이스를 만들 수 없습니다. 요소의 수가 이상한 경우에는 어떻게해야합니까?
어쨌든, 순차적 인 Ints에 의해 키가 입력 된 RDD가 있고 합계가 몇 개인 지 알 수 있습니다. 그런 다음 아래와 같이 사용자 정의 Partitioner를 작성할 수 있습니다.
class ExactPartitioner[V]( partitions: Int, elements: Int) extends Partitioner { def getPartition(key: Any): Int = { val k = key.asInstanceOf[Int] // `k` is assumed to go continuously from 0 to elements-1. return k * partitions / elements } }
-
==============================
2.이 대답은 Daniel로부터 영감을 얻었지만 사람들의 복사 및 붙여 넣기에 대한 예제가있는 전체 구현 (포주 라이브러리 패턴 사용)을 제공합니다. :)
이 대답은 Daniel로부터 영감을 얻었지만 사람들의 복사 및 붙여 넣기에 대한 예제가있는 전체 구현 (포주 라이브러리 패턴 사용)을 제공합니다. :)
import RDDConversions._ trait RDDWrapper[T] { def rdd: RDD[T] } // TODO View bounds are deprecated, should use context bounds // Might need to change ClassManifest for ClassTag in spark 1.0.0 case class RichPairRDD[K <% Ordered[K] : ClassManifest, V: ClassManifest]( rdd: RDD[(K, V)]) extends RDDWrapper[(K, V)] { // Here we use a single Long to try to ensure the sort is balanced, // but for really large dataset, we may want to consider // using a tuple of many Longs or even a GUID def sortByKeyGrouped(numPartitions: Int): RDD[(K, V)] = rdd.map(kv => ((kv._1, Random.nextLong()), kv._2)).sortByKey() .grouped(numPartitions).map(t => (t._1._1, t._2)) } case class RichRDD[T: ClassManifest](rdd: RDD[T]) extends RDDWrapper[T] { def grouped(size: Int): RDD[T] = { // TODO Version where withIndex is cached val withIndex = rdd.mapPartitions(_.zipWithIndex) val startValues = withIndex.mapPartitionsWithIndex((i, iter) => Iterator((i, iter.toIterable.last))).toArray().toList .sortBy(_._1).map(_._2._2.toLong).scan(-1L)(_ + _).map(_ + 1L) withIndex.mapPartitionsWithIndex((i, iter) => iter.map { case (value, index) => (startValues(i) + index.toLong, value) }) .partitionBy(new Partitioner { def numPartitions: Int = size def getPartition(key: Any): Int = (key.asInstanceOf[Long] * numPartitions.toLong / startValues.last).toInt }) .map(_._2) } }
그런 다음 다른 파일에서
// TODO modify above to be implicit class, rather than have implicit conversions object RDDConversions { implicit def toRichRDD[T: ClassManifest](rdd: RDD[T]): RichRDD[T] = new RichRDD[T](rdd) implicit def toRichPairRDD[K <% Ordered[K] : ClassManifest, V: ClassManifest]( rdd: RDD[(K, V)]): RichPairRDD[K, V] = RichPairRDD(rdd) implicit def toRDD[T](rdd: RDDWrapper[T]): RDD[T] = rdd.rdd }
그런 다음 당신의 유스 케이스에 당신은 단지 (이미 정렬되었다고 가정하고)
import RDDConversions._ yourRdd.grouped(2)
면책 조항 : 테스트하지 않은, 좀 그냥 바로이 답변에 쓴
from https://stackoverflow.com/questions/23127329/how-to-define-custom-partitioner-for-spark-rdds-of-equally-sized-partition-where by cc-by-sa and MIT license
'HADOOP' 카테고리의 다른 글
[HADOOP] hadoop map 보조 정렬 줄이기 (0) | 2019.05.28 |
---|---|
[HADOOP] Java 프로그램에서 Sqoop을 사용하는 방법? (0) | 2019.05.28 |
[HADOOP] 이름 노드가 안전 모드입니다. 떠날 수 없다. (0) | 2019.05.28 |
[HADOOP] hadoop의 모든 액션 전에 ugi.checkTGTAndReloginFromKeytab ()을 호출해야합니까? (0) | 2019.05.28 |
[HADOOP] 기술적으로 s3n, s3a 및 s3의 차이점은 무엇입니까? (0) | 2019.05.28 |