복붙노트

[SCALA] 대규모 데이터 세트에 대한 COGROUP를 사용하는 방법

SCALA

대규모 데이터 세트에 대한 COGROUP를 사용하는 방법

나는이 두 RDD의 즉 발 tab_a : RDD [(문자열, 문자열)]와 val의 tab_b : RDD [(문자열, 문자열)] 내가 좋아하는 그 데이터 세트에 대한 cogroup을 사용하고 있습니다 :

val tab_c = tab_a.cogroup(tab_b).collect.toArray

val updated = tab_c.map { x =>
  {
 //somecode
  }
}

나는지도 기능에 cogrouped 값을 tab_c 사용하고 있는데 작은 데이터 세트에 대해 잘 작동하지만 거대한 데이터 세트의 경우는 메모리 부족 예외가 발생합니다.

나는 RDD에 대한 최종 값하지만 운이 같은 오류를 변환 시도

val newcos = spark.sparkContext.parallelize(tab_c)

대규모 데이터 세트에 대한 Cogroup를 사용하는 1.How?

2.Can는 우리가 cogrouped 값을 지속?

암호

 val source_primary_key = source.map(rec => (rec.split(",")(0), rec))
source_primary_key.persist(StorageLevel.DISK_ONLY)

val destination_primary_key = destination.map(rec => (rec.split(",")(0), rec))
destination_primary_key.persist(StorageLevel.DISK_ONLY)

val cos = source_primary_key.cogroup(destination_primary_key).repartition(10).collect()

  var srcmis: Array[String] = new Array[String](0)
var destmis: Array[String] = new Array[String](0)

var extrainsrc: Array[String] = new Array[String](0)
var extraindest: Array[String] = new Array[String](0)

var srcs: String = Seq("")(0)
var destt: String = Seq("")(0)

val updated = cos.map { x =>
  {

    val key = x._1
    val value = x._2

    srcs = value._1.mkString(",")
    destt = value._2.mkString(",")

    if (srcs.equalsIgnoreCase(destt) == false && destt != "") {
      srcmis :+= srcs
      destmis :+= destt

    }

    if (srcs == "") {

      extraindest :+= destt.mkString("")
    }

    if (destt == "") {

      extrainsrc :+= srcs.mkString("")
    }

  }

}

코드 업데이트 :

  val tab_c = tab_a.cogroup(tab_b).filter(x => x._2._1 =!= x => x._2._2)
 // tab_c = {1,Compactbuffer(1,john,US),Compactbuffer(1,john,UK)}
      {2,Compactbuffer(2,john,US),Compactbuffer(2,johnson,UK)}..

오류:

 ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerTaskEnd(4,3,ResultTask,FetchFailed(null,0,-1,27,org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:697)
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:693)


ERROR YarnScheduler: Lost executor 8 on datanode1: Container killed by YARN for exceeding memory limits. 1.0 GB of 1020 MB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

감사합니다

해결법

  1. ==============================

    1.수집 사용할 때 () 당신은 기본적으로 쉽게 병목 현상을 생산할 수있는 마스터 노드에 다시 모든 결과 데이터를 이동하는 스파크를 말하고있다. 당신은 더 이상 그 시점에서 단일 시스템에서 단지 일반 배열을 불꽃을 사용하지 않습니다.

    수집 사용할 때 () 당신은 기본적으로 쉽게 병목 현상을 생산할 수있는 마스터 노드에 다시 모든 결과 데이터를 이동하는 스파크를 말하고있다. 당신은 더 이상 그 시점에서 단일 시스템에서 단지 일반 배열을 불꽃을 사용하지 않습니다.

    집행 분산 파일 시스템의 상단에 살고있는 이유를 계산 그냥 모든 노드에 데이터를 필요로 뭔가를 사용하여 트리거하려면, 그건. 예를 saveAsTextFile를 들어 ().

    다음은 몇 가지 기본적인 예입니다.

    전체 목표 여기가 (당신이 빅 데이터가있는 경우 즉,) 데이터에 코드를 이동하고있다 계산, 계산에 모든 데이터를 가지고하지 않는 것입니다 기억하십시오.

  2. ==============================

    2.TL; DR은 수집하지 않습니다.

    TL; DR은 수집하지 않습니다.

    (작업자 노드에 대한 평균 요구 사항에 크게 작아 질 수 있음) 추가 가정하지 않고, 안전이 코드를 실행하려면, 모든 노드 (드라이버와 각 집행은) 상당히 모든 데이터에 대한 전체 메모리 요구 사항을 초과하는 메모리를 필요로한다.

    당신이 그것을 외부에서 스파크를 실행한다면 당신은 하나의 노드가 필요합니다. 따라서 스파크는 여기에 어떤 혜택을 제공하지 않습니다.

    당신이 collect.toArray를 생략하고 데이터 배포에 대한 몇 가지 가정을하지만 경우에 당신은 잘 실행될 수 있습니다.

  3. from https://stackoverflow.com/questions/47180307/how-to-use-cogroup-for-large-datasets by cc-by-sa and MIT license