복붙노트

[HADOOP] 스파크 RDD에서 combineByKey 및 aggregateByKey없이 지정된 출력을 얻는 방법

HADOOP

스파크 RDD에서 combineByKey 및 aggregateByKey없이 지정된 출력을 얻는 방법

다음은 내 데이터는 다음과 같습니다

val keysWithValuesList = Array("foo=A", "foo=A", "foo=A", "foo=A", "foo=B", bar=C","bar=D", "bar=D")  

지금은 출력 있지만 combineByKey 및 aggregateByKey를 사용하지 않고 유형 아래에서 원하는 :

1) Array[(String, Int)] = Array((foo,5), (bar,3))  
2) Array((foo,Set(B, A)),
(bar,Set(C, D)))  

다음은 내 시도는 다음과 같습니다

scala> val keysWithValuesList = Array("foo=A", "foo=A", "foo=A", "foo=A", "foo=B", "bar=C",
     | "bar=D", "bar=D")  
scala> val sample=keysWithValuesList.map(_.split("=")).map(p=>(p(0),(p(1))))
sample: Array[(String, String)] = Array((foo,A), (foo,A), (foo,A), (foo,A), (foo,B), (bar,C), (bar,D), (bar,D))  

내가 매핑 된 RDD에 대한 적용 방법을 볼 탭 다음에 변수 이름을 입력 할 때 지금은 내 요구 사항을 충족 할 수있는 없음에서 아래의 옵션을 볼 수 있습니다 :

scala> sample.
apply          asInstanceOf   clone          isInstanceOf   length         toString       update         

그래서 내가 어떻게 이것을 달성 할 수?

해결법

  1. ==============================

    1.여기에 표준 접근 방식이다.

    여기에 표준 접근 방식이다.

    점에 유의하십시오 : 당신은 RDD 작업을해야합니다. 나는 그 병목 생각합니다.

    여기 요 :

    val keysWithValuesList = Array("foo=A", "foo=A", "foo=A", "foo=A", "foo=B", "bar=C","bar=D", "bar=D") 
    
    val sample=keysWithValuesList.map(_.split("=")).map(p=>(p(0),(p(1))))
    
    val sample2 = sc.parallelize(sample.map(x => (x._1, 1)))
    val sample3 = sample2.reduceByKey(_+_) 
    sample3.collect()
    
    val sample4 = sc.parallelize(sample.map(x => (x._1, x._2))).groupByKey()   
    sample4.collect()
    
    val sample5 = sample4.map(x => (x._1, x._2.toSet))
    sample5.collect()
    
  2. from https://stackoverflow.com/questions/53150584/how-to-get-the-specified-output-without-combinebykey-and-aggregatebykey-in-spark by cc-by-sa and MIT license