복붙노트

[HADOOP] 직렬화 및 사용자 정의 스파크 RDD 클래스

HADOOP

직렬화 및 사용자 정의 스파크 RDD 클래스

나는 스칼라에서 사용자 지정 점화 RDD 구현을 쓰고 있어요, 나는 스파크 쉘을 사용하여 내 구현을 디버깅하고있다. 지금 내 목표는 얻을 것입니다 :

customRDD.count

예외없이 성공합니다. 지금이 내가 갖는 것입니다 :

15/03/06 23:02:32 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
15/03/06 23:02:32 ERROR TaskSetManager: Failed to serialize task 0, not attempting to retry it.
java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.serializer.SerializationDebugger$ObjectStreamClassMethods$.getObjFieldValues$extension(SerializationDebugger.scala:240)

...

Caused by: java.lang.ArrayIndexOutOfBoundsException: 1
    at java.io.ObjectStreamClass$FieldReflector.getObjFieldValues(ObjectStreamClass.java:2050)
    at java.io.ObjectStreamClass.getObjFieldValues(ObjectStreamClass.java:1252)
    ... 45 more

는 "작업 0 직렬화 실패"내 관심을 잡는다. 나는 customRDD.count을에 무슨 일이 일어나고 있는지의 뛰어난 정신 사진을하지 않아도, 그것은 직렬화 할 수없는 정확히 매우 불분명하다.

내 사용자 정의 RDD는 다음과 같이 구성

내 스파크 쉘 세션은 다음과 같습니다 :

import custom.rdd.stuff
import org.apache.spark.SparkContext

val conf = sc.getConf
conf.set(custom, parameters)
sc.stop
sc2 = new SparkContext(conf)
val mapOfThings: Map[String, String] = ...
myRdd = customRDD(sc2, mapOfStuff)
myRdd.count

... (exception output) ...

내가 알고 싶은 것은 :

이 문제에 대한 명확한 설명 정말 감사합니다.

해결법

  1. ==============================

    1.스파크 컨텍스트에서 실행 코드는 작업이에 실행하도록 지시하는 작업자 노드의 동일한 프로세스 경계 내에 존재해야합니다. 이 관리는 당신의 RDD 사용자 정의에서 참조 된 객체 나 값이 직렬화되도록주의해야한다는 것을 의미합니다. 객체가 직렬화 가능하지 않은 경우, 당신은 각 파티션이 해당 객체의 새로운 인스턴스를 가질 수 있도록 제대로 범위되었는지 확인해야합니다.

    스파크 컨텍스트에서 실행 코드는 작업이에 실행하도록 지시하는 작업자 노드의 동일한 프로세스 경계 내에 존재해야합니다. 이 관리는 당신의 RDD 사용자 정의에서 참조 된 객체 나 값이 직렬화되도록주의해야한다는 것을 의미합니다. 객체가 직렬화 가능하지 않은 경우, 당신은 각 파티션이 해당 객체의 새로운 인스턴스를 가질 수 있도록 제대로 범위되었는지 확인해야합니다.

    기본적으로, 당신은 당신의 스파크 드라이버에 선언 된 객체의 직렬화 인스턴스를 공유하고 상태가 클러스터의 다른 노드로 복제 할 기대할 수 없다.

    이 직렬화 가능하지 않은 오브젝트를 직렬화 할 수 없게됩니다 예입니다 :

    NotSerializable notSerializable = new NotSerializable();
    JavaRDD<String> rdd = sc.textFile("/tmp/myfile");
    
    rdd.map(s -> notSerializable.doSomething(s)).collect();
    

    아래의 예는 비 직렬화 오브젝트의 인스턴스의 상태를 직렬화 할 필요없이이 람다의 컨텍스트에 포함되어 있기 때문에, 제대로 다중 파티션에 분배 할 수 있고, 잘 작동 할 것이다. 이 또한 RDD 사용자 정의의 일부 (있는 경우)로 참조 직렬화 이적 종속성을 간다.

    rdd.forEachPartition(iter -> {
      NotSerializable notSerializable = new NotSerializable();
    
      // ...Now process iter
    });
    

    자세한 내용은 여기를 참조하십시오 : http://databricks.gitbooks.io/databricks-spark-knowledge-base/content/troubleshooting/javaionotserializableexception.html

  2. ==============================

    2.케니의 설명뿐만 아니라, 당신이 문제의 원인을 확인하려면 직렬화 디버깅을 제안합니다. 종종 그냥 코드를보고 알아 내기 위해 인간적으로 불가능하다.

    케니의 설명뿐만 아니라, 당신이 문제의 원인을 확인하려면 직렬화 디버깅을 제안합니다. 종종 그냥 코드를보고 알아 내기 위해 인간적으로 불가능하다.

    -Dsun.io.serialization.extendedDebugInfo=true
    
  3. ==============================

    3.문제는 당신이 사용자 정의에 RDD 방식 (세관 RDD (SC2, 물건의 맵)) SparkContex (보일러 플레이트)를 전달하는 것입니다. 클래스는 또한에 SparkContext을 직렬화해야합니다.

    문제는 당신이 사용자 정의에 RDD 방식 (세관 RDD (SC2, 물건의 맵)) SparkContex (보일러 플레이트)를 전달하는 것입니다. 클래스는 또한에 SparkContext을 직렬화해야합니다.

  4. from https://stackoverflow.com/questions/28909680/serialization-and-custom-spark-rdd-class by cc-by-sa and MIT license