복붙노트

[SCALA] 스칼라 스파크에서 NullPointerException이, 컬렉션 유형으로 발생하는 것으로 나타납니다?

SCALA

스칼라 스파크에서 NullPointerException이, 컬렉션 유형으로 발생하는 것으로 나타납니다?

sessionIdList 유형이다 :

scala> sessionIdList
res19: org.apache.spark.rdd.RDD[String] = MappedRDD[17] at distinct at <console>:30

나는 코드 아래 실행하려고하면 :

val x = sc.parallelize(List(1,2,3)) 
val cartesianComp = x.cartesian(x).map(x => (x))

val kDistanceNeighbourhood = sessionIdList.map(s => {
    cartesianComp.filter(v => v != null)
})

kDistanceNeighbourhood.take(1)

나는 예외가 나타납니다

14/05/21 16:20:46 ERROR Executor: Exception in task ID 80
java.lang.NullPointerException
        at org.apache.spark.rdd.RDD.filter(RDD.scala:261)
        at $line94.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:38)
        at $line94.$read$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:36)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)

내가 사용하지만 경우 :

val l = sc.parallelize(List("1","2")) 
val kDistanceNeighbourhood = l.map(s => {    
    cartesianComp.filter(v => v != null)
})

kDistanceNeighbourhood.take(1)

그런 다음 예외가 표시되지 않습니다

두 코드 단편의 차이는 제 니펫 sessionIdList 타입의 것이있다 :

res19: org.apache.spark.rdd.RDD[String] = MappedRDD[17] at distinct at <console>:30

및 제 코드에서 "L"은 형인

scala> l
res13: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[32] at parallelize at <console>:12

왜이 오류가 발생?

나는이 문제를 해결하기 위해 ParallelCollectionRDD에 sessionIdList을 변환해야합니까?

해결법

  1. ==============================

    1.스파크는 RDDs의 중첩 (같은 문제의 또 다른 발생을 https://stackoverflow.com/a/14130534/590203 참조)를 지원하지 않습니다, 그래서 당신은 다른 RDD 작업의 내부 RDDs의 변형 또는 작업을 수행 할 수 없습니다.

    스파크는 RDDs의 중첩 (같은 문제의 또 다른 발생을 https://stackoverflow.com/a/14130534/590203 참조)를 지원하지 않습니다, 그래서 당신은 다른 RDD 작업의 내부 RDDs의 변형 또는 작업을 수행 할 수 없습니다.

    이 드라이버가 아닌 노동자에만 존재의 SparkContext 개체에 액세스하려고 할 때 첫 번째 경우에, 당신은 작업자에 의해 발생 NullPointerException이보고있다.

    두 번째 경우, 내 직감 작업이 드라이버에서 로컬로 실행 사고에 의해 순수하게 일했다입니다.

  2. ==============================

    2.그것의 합리적인 질문 나는 충분히 배를 그에게 물었다 들었습니다. 나는 이것이 사실 왜 도움이 될 수 있기 때문에, 설명에 자상을하려고하는거야.

    그것의 합리적인 질문 나는 충분히 배를 그에게 물었다 들었습니다. 나는 이것이 사실 왜 도움이 될 수 있기 때문에, 설명에 자상을하려고하는거야.

    중첩 된 RDDs는 항상 생산에 예외가 발생합니다. 나는 그것이 RDD 작업을 내부에 RDD 작업을 호출하는 것을 의미 경우, 여기를 설명하는 생각으로 중첩 기능은 원인이 실제로 같은 일이기 때문에 고장의 원인이됩니다 호출합니다. (RDDs 그래서 이러한 "매핑"로서 RDD 동작을 수행 불변 새로운 RDD을 만드는 것과 같다.) 중첩 RDDs 생성하는 능력에가 RDD 정의하는 방법의 필연적 스파크 출원 인 방법 설정.

    RDD는 스파크 실행자에 살고 객체 (라는 파티션)의 분산 모음입니다. 스파크 집행에만 스파크 드라이버를 사용하여 서로 통신 할 수 없습니다. RDD 작업은 모든 RDD의 집행 환경은 재귀하지 않습니다이 partitions.Because에 조각에서 계산되지 않는다 (즉, 당신이 하위 집행과 스파크 집행에있을 불꽃 드라이버를 구성 할 수 있습니다) 둘 RDD을 수행 할 수 있습니다.

    프로그램에서는 정수의 파티션의 분산 수집을 만들었습니다. 그런 다음 매핑 작업을 수행하고 있습니다. 스파크 드라이버가 매핑 작업을 볼 때, 그것은 병렬로 각 파티션의 변환을 수행하는 실행 프로그램에 매핑을 수행 할 지침을 보냅니다. 각 파티션에 다른 분산 작업을 수행 할 수있는 "전체 RDD"을 호출하려고하기 때문에 그러나 당신의 매핑은 할 수 없습니다. 그것은 한 경우 각 파티션이 다른 파티션에 대한 정보에 액세스 할 수 없기 때문에 이것은, 수행 할 수 없습니다 수없는, 계산을 병렬로 실행할 수 없습니다.

    무엇 (필터를하고 있으며, 필터가 sessionIdList에 대한 정보를 필요로하지 않기 때문에) 당신은지도에 필요한 데이터는 아마 작기 때문에 당신이 대신 할 수있는 것은 첫 번째 세션 ID의 목록을 필터링하는 것입니다. 그런 다음 드라이버에 그 목록을 수집합니다. 그런 다음지도에서 사용할 수있는 집행,에 방송. sessionid를 목록이 너무 큰 경우, 당신은 아마 조인을 수행해야합니다.

  3. from https://stackoverflow.com/questions/23793117/nullpointerexception-in-scala-spark-appears-to-be-caused-be-collection-type by cc-by-sa and MIT license