복붙노트

[SCALA] 작업하지 직렬화 : java.io.NotSerializableException 클래스 만하지 개체에 대한 폐쇄 외부 함수를 호출 할 때

SCALA

작업하지 직렬화 : java.io.NotSerializableException 클래스 만하지 개체에 대한 폐쇄 외부 함수를 호출 할 때

폐쇄의 외부 함수를 호출 할 때 이상한 행동을 얻기 :

문제는 내가 객체 클래스와 나의 코드가 필요하다. 어떤 생각이 왜 이런 일이 일어나고 있는가? 스칼라 객체 (기본값?) 연재되어 있습니까?

이 작업 코드 예제입니다 :

object working extends App {
    val list = List(1,2,3)

    val rddList = Spark.ctx.parallelize(list)
    //calling function outside closure 
    val after = rddList.map(someFunc(_))

    def someFunc(a:Int)  = a+1

    after.collect().map(println(_))
}

이것은 작동하지 않는 예이다 :

object NOTworking extends App {
  new testing().doIT
}

//adding extends Serializable wont help
class testing {  
  val list = List(1,2,3)  
  val rddList = Spark.ctx.parallelize(list)

  def doIT =  {
    //again calling the fucntion someFunc 
    val after = rddList.map(someFunc(_))
    //this will crash (spark lazy)
    after.collect().map(println(_))
  }

  def someFunc(a:Int) = a+1
}

해결법

  1. ==============================

    1.나는 다른 대답은 완전히 올바른 생각하지 않습니다. RDDs은 참으로 직렬화, 그래서 이것은 당신의 작업이 실패 할 일으키는 것이 아니다.

    나는 다른 대답은 완전히 올바른 생각하지 않습니다. RDDs은 참으로 직렬화, 그래서 이것은 당신의 작업이 실패 할 일으키는 것이 아니다.

    스파크가 분산 컴퓨팅 엔진 및 메인 추상화 분산 집합으로 볼 수있는 탄성 분산 데이터 셋 (RDD)이다. 기본적으로, RDD의 요소는 클러스터의 노드에 분할되어 있지만, 스파크는 로컬 한 것처럼 사용자가 RDD (컬렉션)와 상호 작용시키는 사용자로부터이 멀리 추상화합니다.

    너무 많은 세부 사항으로 얻을 수 있지만, 당신은 RDD에 (지도, flatMap, 필터 등)을 다른 변환을 실행할 때, 당신의 변환 코드 (폐쇄)입니다하지 않기 :

    물론 당신이 (당신의 예에서와 같이) 로컬이를 실행할 수 있지만 모든 단계는 (떨어져 네트워크를 통해 운송에서) 여전히 발생합니다. [이도 생산에 배포하기 전에 당신이 어떤 버그를 잡을 수 있습니다]

    어떤 두 번째 경우에 발생하면 map 함수 내부에서 클래스 테스트에 정의 된 메소드를 호출하는 것입니다. 스파크는 그것을보고 방법이 독자적으로 직렬화 할 수 없기 때문에 다른 JVM에서 실행될 때 코드가 여전히 작동 할 수 있도록, 스파크, 전체 테스트 클래스를 직렬화하려고합니다. 당신은 두 가지 가능성이있다 :

    학급 전체가 불꽃에 의해 직렬화 할 수 있도록 어느 당신은 클래스 테스트를 직렬화합니다

    import org.apache.spark.{SparkContext,SparkConf}
    
    object Spark {
      val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
    }
    
    object NOTworking extends App {
      new Test().doIT
    }
    
    class Test extends java.io.Serializable {
      val rddList = Spark.ctx.parallelize(List(1,2,3))
    
      def doIT() =  {
        val after = rddList.map(someFunc)
        after.collect().foreach(println)
      }
    
      def someFunc(a: Int) = a + 1
    }
    

    또는 당신은 불꽃이 직렬화 할 수 있도록 (함수는 스칼라의 객체가) 대신 방법을 someFunc 기능을합니다

    import org.apache.spark.{SparkContext,SparkConf}
    
    object Spark {
      val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
    }
    
    object NOTworking extends App {
      new Test().doIT
    }
    
    class Test {
      val rddList = Spark.ctx.parallelize(List(1,2,3))
    
      def doIT() =  {
        val after = rddList.map(someFunc)
        after.collect().foreach(println)
      }
    
      val someFunc = (a: Int) => a + 1
    }
    

    클래스 직렬화와 비슷한,하지만 같은 문제가 관심을 가질 수 있으며,이 스파크 정상 회의 2013 프리젠 테이션에 읽을 수 있습니다.

    보조 노트로서, 당신은 rddList.map에 rddList.map (someFunc (_)) (someFunc를) 다시 작성할 수 있습니다, 그들은 정확히 동일합니다. 덜 상세하고 읽기 청소기의로 일반적으로, 두 번째는 바람직하다.

    EDIT (2015년 3월 15일) : SPARK-5307은 SerializationDebugger를 도입 1.3.0을 사용하는 첫 번째 버전입니다 스파크. 그것은 가지는 NotSerializableException에 직렬화 경로를 추가합니다. 가지는 NotSerializableException가 발생하면 디버거는 직렬화 할 수없는 대상을 향해 경로를 찾기 위해 객체 그래프를 방문하고, 객체를 찾기 위해 사용자에게 도움이되는 정보를 구성한다.

    영업 이익의 경우,이 표준 출력에 인쇄됩니다 것입니다 :

    Serialization stack:
        - object not serializable (class: testing, value: testing@2dfe2f00)
        - field (class: testing$$anonfun$1, name: $outer, type: class testing)
        - object (class testing$$anonfun$1, <function1>)
    
  2. ==============================

    2.그레의 대답은 원래의 코드가 문제를 해결하는 두 가지 방법을 작동하지 않는 이유를 설명하는 데 유용합니다. 그러나,이 솔루션은 매우 유연하지 않다; 당신의 폐쇄는 제어 할 수없는 비 직렬화 가능 클래스의 메소드 호출을 포함하는 경우를 고려한다. 이 클래스에 직렬화 태그를 추가하지 않고 함수로 방법을 변경하기 위해 기본이되는 구현을 변경할 수 있습니다 둘.

    그레의 대답은 원래의 코드가 문제를 해결하는 두 가지 방법을 작동하지 않는 이유를 설명하는 데 유용합니다. 그러나,이 솔루션은 매우 유연하지 않다; 당신의 폐쇄는 제어 할 수없는 비 직렬화 가능 클래스의 메소드 호출을 포함하는 경우를 고려한다. 이 클래스에 직렬화 태그를 추가하지 않고 함수로 방법을 변경하기 위해 기본이되는 구현을 변경할 수 있습니다 둘.

    Nilesh이에 대한 좋은 해결 방법을 제시하지만, 솔루션은보다 간결하고 일반적인 만들 수 있습니다 :

    def genMapper[A, B](f: A => B): A => B = {
      val locker = com.twitter.chill.MeatLocker(f)
      x => locker.get.apply(x)
    }
    

    이 기능 - 시리얼 라이저는 자동 폐쇄 및 메서드 호출을 래핑하는 데 사용할 수 있습니다 :

    rdd map genMapper(someFunc)
    

    트위터의 칠이 이미 핵심 스파크에 의해 당겨 때문에이 기술은 KryoSerializationWrapper에 액세스하기 위해 추가 상어 종속성을 필요로하지 않는 이점이있다

  3. ==============================

    3.https://github.com/samthebest/dump/blob/master/sams-scala-tutorial/serialization-exceptions-and-memory- : 전체 이야기는 완전히이 직렬화 문제를 방지하는 방법을 이동하는 큰 패러다임을 제안하는 문제를 설명 leaks-no-ws.md

    https://github.com/samthebest/dump/blob/master/sams-scala-tutorial/serialization-exceptions-and-memory- : 전체 이야기는 완전히이 직렬화 문제를 방지하는 방법을 이동하는 큰 패러다임을 제안하는 문제를 설명 leaks-no-ws.md

    정상은 대답은 기본적으로 전체 언어 기능을 버리고 제안되어 투표를하지 않습니다 - 즉 더 이상 방법을 사용하여 만 기능을 사용하여. 실제로 클래스의 기능을 프로그래밍 방법에 피해야하지만, 함수로 돌리면 여기에 설계 문제를 해결하지 않습니다 (위의 링크 참조)해야합니다.

    이 특정 상황에서 빠른 수정으로 당신은 단지 잘못된 값에는 직렬화하려고하지를 말 할 수 @Transient 주석을 사용할 수있다 (여기서, Spark.ctx 사용자 정의 클래스의 OP의 이름 아래 하나의 불꽃되지 않습니다)

    @transient
    val rddList = Spark.ctx.parallelize(list)
    

    rddList 다른 곳에 살고 그래서 당신은 또한 코드를 재구성 할 수 있지만, 그 또한 불쾌한입니다.

    미래 스칼라에서 우리가 수행하고 정확하게 폐쇄에 의해 뽑아되지 않습니다 어떤 입자 제어를 세밀하게 할 수 있도록해야한다 "포자"라고 이런 일이 포함됩니다. 또한이 오히려 끔찍한 런타임 예외 / 메모리 누수되는 지금보다 컴파일 오류로 실수로 직렬화 유형 당기는 모든 실수 (또는 원하지 않는 값)을 설정해야합니다.

    http://docs.scala-lang.org/sips/pending/spores.html

    KYRO를 사용하는 경우, 그 등록이 필요 그래서, 이것은 당신이 대신 메모리 누수의 오류를 얻을 의미합니다합니다

    내가 등록하지 않은 경우 "마지막으로, 나는 kryo이 kryo.setRegistrationOptional (true)를 가지고 있음을 알고 있지만이 옵션이 설정됩니다. 그것을 사용하는 방법을 알아 내려고 매우 어려운 시간을 보내고 있어요 kryo은 여전히 ​​예외를 던질 것으로 보인다 클래스."

    kryo와 클래스를 등록하는 전략

    물론 이것은 단지 사용자가 입력하는 수준의 제어를하지 값 수준을 제어 할 수 있습니다.

    ... 더 많은 아이디어를 제공합니다.

  4. ==============================

    4.나는 다른 접근 방식을 사용하여이 문제를 해결했다. 당신은 단순히 폐쇄를 통과하기 전에 개체를 직렬화하고, 이후 역 직렬화 할 필요가있다. 이 방법은 그냥 뒤에서 Kryo를 사용하기 때문에 수업은, 직렬화 가능하지 않은 경우에도 작동합니다. 당신이 필요로하는 일부 카레입니다. ;)

    나는 다른 접근 방식을 사용하여이 문제를 해결했다. 당신은 단순히 폐쇄를 통과하기 전에 개체를 직렬화하고, 이후 역 직렬화 할 필요가있다. 이 방법은 그냥 뒤에서 Kryo를 사용하기 때문에 수업은, 직렬화 가능하지 않은 경우에도 작동합니다. 당신이 필요로하는 일부 카레입니다. ;)

    여기에 내가 그것을 어떻게의 예입니다 :

    def genMapper(kryoWrapper: KryoSerializationWrapper[(Foo => Bar)])
                   (foo: Foo) : Bar = {
        kryoWrapper.value.apply(foo)
    }
    val mapper = genMapper(KryoSerializationWrapper(new Blah(abc))) _
    rdd.flatMap(mapper).collectAsMap()
    
    object Blah(abc: ABC) extends (Foo => Bar) {
        def apply(foo: Foo) : Bar = { //This is the real function }
    }
    

    복잡로 어쩌구을 자유롭게 당신이 원하는대로, 클래스, 동반자 객체, 중첩 클래스, 다수의 제 3 자 libs와에 대한 참조.

    https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala : KryoSerializationWrapper은을 말한다

  5. ==============================

    5.나는 유사한 문제에 직면, 나는 그레의 대답은에서 이해

    나는 유사한 문제에 직면, 나는 그레의 대답은에서 이해

    object NOTworking extends App {
     new testing().doIT
    }
    //adding extends Serializable wont help
    class testing {
    
    val list = List(1,2,3)
    
    val rddList = Spark.ctx.parallelize(list)
    
    def doIT =  {
      //again calling the fucntion someFunc 
      val after = rddList.map(someFunc(_))
      //this will crash (spark lazy)
      after.collect().map(println(_))
    }
    
    def someFunc(a:Int) = a+1
    
    }
    

    당신의 작은 동전 방법은 someFunc (_) 메소드를 직렬화하려고하지만 방법으로 직렬화하지 않습니다, 그것은 다시 직렬화 할 수없는 수준의 테스트를 직렬화하려고합니다.

    그래서 당신은 DOIT 방법 내부 someFunc을 정의해야합니다, 당신의 코드가 작동합니다. 예를 들면 :

    def doIT =  {
     def someFunc(a:Int) = a+1
      //function definition
     }
     val after = rddList.map(someFunc(_))
     after.collect().map(println(_))
    }
    

    사진에 나오는 여러 기능이있는 경우, 그때 모든 기능은 상위 컨텍스트에 사용할 수 있습니다.

  6. ==============================

    6.나는 자바에서, 나는 폐쇄가 아닌 직렬화 최종 필드에 액세스하지 않도록 내 코드를 리팩토링하여 가지는 NotSerializableException 해결이 스칼라에 적용 완전히 확실하지 않다지만.

    나는 자바에서, 나는 폐쇄가 아닌 직렬화 최종 필드에 액세스하지 않도록 내 코드를 리팩토링하여 가지는 NotSerializableException 해결이 스칼라에 적용 완전히 확실하지 않다지만.

  7. from https://stackoverflow.com/questions/22592811/task-not-serializable-java-io-notserializableexception-when-calling-function-ou by cc-by-sa and MIT license