복붙노트

[REDIS] 스파크에 레디 스 : 작업이 직렬화하지

REDIS

스파크에 레디 스 : 작업이 직렬화하지

우리는 우리의 키 - 값 pairs.This를 캐시 스파크에 레디 스를 사용하면 코드입니다 :

import com.redis.RedisClient
val r = new RedisClient("192.168.1.101", 6379)
val perhit = perhitFile.map(x => {
    val arr = x.split(" ")
    val readId = arr(0).toInt
    val refId = arr(1).toInt
    val start = arr(2).toInt
    val end = arr(3).toInt
    val refStr = r.hmget("refStr", refId).get(refId).split(",")(1)
    val readStr = r.hmget("readStr", readId).get(readId)
    val realend = if(end > refStr.length - 1) refStr.length - 1 else end
    val refOneStr = refStr.substring(start, realend)
      (readStr, refOneStr, refId, start, realend, readId)
 })

하지만 컴파일러는 나에게 이런 피드백을 주었다 :

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
    at org.apache.spark.rdd.RDD.map(RDD.scala:270)
    at com.ynu.App$.main(App.scala:511)
    at com.ynu.App.main(App.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: com.redis.RedisClient
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
    ... 12 more

누군가가 어떻게 직렬화하는 데이터가 Redis.Thanks에서 많이 얻을 말해 수 없습니다.

해결법

  1. ==============================

    1.불꽃에서 (여기지도 등) RDDs의 기능 직렬화 및 처리 용 실행기에 송신. 이것은 그 작업에 포함 된 모든 요소가 직렬화되어야한다는 것을 의미한다.

    불꽃에서 (여기지도 등) RDDs의 기능 직렬화 및 처리 용 실행기에 송신. 이것은 그 작업에 포함 된 모든 요소가 직렬화되어야한다는 것을 의미한다.

    그것은이 만들어지는 기계에 바인딩 된 대상 DB에 대한 TCP 연결을 엽니로 여기 레디 스 연결은 직렬화되지 않습니다.

    용액 로컬 실행 컨텍스트에서, 실행기에 해당 연결을 생성하는 것이다. 그렇게 할 수있는 몇 가지 방법이있어. 마음에 팝업 두 있습니다 :

    그것을 필요로하는 모든 프로그램 구조에 작은 변화가 그대로 mapPartitions가 쉽습니다 :

    val perhit = perhitFile.mapPartitions{partition => 
        val r = new RedisClient("192.168.1.101", 6379) // create the connection in the context of the mapPartition operation
        val res = partition.map{ x =>
            ...
            val refStr = r.hmget(...) // use r to process the local data
        }
        r.close // take care of resources
        res
    }
    

    싱글 톤 연결 관리자는 연결에 게으른 참조 보유하는 객체로 모델링 할 수있다 (참고 : 변경 가능한 심판 것 또한 일).

    object RedisConnection extends Serializable {
       lazy val conn: RedisClient = new RedisClient("192.168.1.101", 6379)
    }
    

    이 목적은 작업자 JVM 당 1 명 접속을 인스턴스화하는데 사용될 수 있고 조작으로 폐쇄 직렬화 오브젝트로서 사용된다.

    val perhit = perhitFile.map{x => 
        val param = f(x)
        val refStr = RedisConnection.conn.hmget(...) // use RedisConnection to get a connection to the local data
        }
    }
    

    단독 개체를 사용하는 이점은 (RDD 파티션 당 1 대조적으로) 연결이 JVM으로 한번만 생성되는 오버 헤드가 적다

    몇 가지 단점도 있습니다 위치 :

    예시 목적을 위해 제공 (*) 코드. 컴파일 또는 테스트하지 않습니다.

  2. ==============================

    2.당신은 클라이언트를 직렬화하기 위해 노력하고 있습니다. 당신은 당신이 다른 클러스터 노드에서 실행됩니다지도 내부에 사용하려는 것으로, 하나 RedisClient, R 있습니다. 개별적으로 클러스터 작업을 수행하기 전에 레디 스 밖으로 원하는 데이터를 얻을, 또는 아마도 각각의 행에 대해 새로운 레디 스 클라이언트를 만드는 등, 오히려지도보다 mapPartitions를 사용하여 맵 블록 내부의 각 클러스터 작업에 대해 개별적으로 클라이언트를 (아마도 만들 나쁜 생각).

    당신은 클라이언트를 직렬화하기 위해 노력하고 있습니다. 당신은 당신이 다른 클러스터 노드에서 실행됩니다지도 내부에 사용하려는 것으로, 하나 RedisClient, R 있습니다. 개별적으로 클러스터 작업을 수행하기 전에 레디 스 밖으로 원하는 데이터를 얻을, 또는 아마도 각각의 행에 대해 새로운 레디 스 클라이언트를 만드는 등, 오히려지도보다 mapPartitions를 사용하여 맵 블록 내부의 각 클러스터 작업에 대해 개별적으로 클라이언트를 (아마도 만들 나쁜 생각).

  3. from https://stackoverflow.com/questions/28006517/redis-on-sparktask-not-serializable by cc-by-sa and MIT license