[REDIS] 스파크에 레디 스 : 작업이 직렬화하지
REDIS스파크에 레디 스 : 작업이 직렬화하지
우리는 우리의 키 - 값 pairs.This를 캐시 스파크에 레디 스를 사용하면 코드입니다 :
import com.redis.RedisClient
val r = new RedisClient("192.168.1.101", 6379)
val perhit = perhitFile.map(x => {
val arr = x.split(" ")
val readId = arr(0).toInt
val refId = arr(1).toInt
val start = arr(2).toInt
val end = arr(3).toInt
val refStr = r.hmget("refStr", refId).get(refId).split(",")(1)
val readStr = r.hmget("readStr", readId).get(readId)
val realend = if(end > refStr.length - 1) refStr.length - 1 else end
val refOneStr = refStr.substring(start, realend)
(readStr, refOneStr, refId, start, realend, readId)
})
하지만 컴파일러는 나에게 이런 피드백을 주었다 :
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
at org.apache.spark.rdd.RDD.map(RDD.scala:270)
at com.ynu.App$.main(App.scala:511)
at com.ynu.App.main(App.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: com.redis.RedisClient
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
... 12 more
누군가가 어떻게 직렬화하는 데이터가 Redis.Thanks에서 많이 얻을 말해 수 없습니다.
해결법
-
==============================
1.불꽃에서 (여기지도 등) RDDs의 기능 직렬화 및 처리 용 실행기에 송신. 이것은 그 작업에 포함 된 모든 요소가 직렬화되어야한다는 것을 의미한다.
불꽃에서 (여기지도 등) RDDs의 기능 직렬화 및 처리 용 실행기에 송신. 이것은 그 작업에 포함 된 모든 요소가 직렬화되어야한다는 것을 의미한다.
그것은이 만들어지는 기계에 바인딩 된 대상 DB에 대한 TCP 연결을 엽니로 여기 레디 스 연결은 직렬화되지 않습니다.
용액 로컬 실행 컨텍스트에서, 실행기에 해당 연결을 생성하는 것이다. 그렇게 할 수있는 몇 가지 방법이있어. 마음에 팝업 두 있습니다 :
그것을 필요로하는 모든 프로그램 구조에 작은 변화가 그대로 mapPartitions가 쉽습니다 :
val perhit = perhitFile.mapPartitions{partition => val r = new RedisClient("192.168.1.101", 6379) // create the connection in the context of the mapPartition operation val res = partition.map{ x => ... val refStr = r.hmget(...) // use r to process the local data } r.close // take care of resources res }
싱글 톤 연결 관리자는 연결에 게으른 참조 보유하는 객체로 모델링 할 수있다 (참고 : 변경 가능한 심판 것 또한 일).
object RedisConnection extends Serializable { lazy val conn: RedisClient = new RedisClient("192.168.1.101", 6379) }
이 목적은 작업자 JVM 당 1 명 접속을 인스턴스화하는데 사용될 수 있고 조작으로 폐쇄 직렬화 오브젝트로서 사용된다.
val perhit = perhitFile.map{x => val param = f(x) val refStr = RedisConnection.conn.hmget(...) // use RedisConnection to get a connection to the local data } }
단독 개체를 사용하는 이점은 (RDD 파티션 당 1 대조적으로) 연결이 JVM으로 한번만 생성되는 오버 헤드가 적다
몇 가지 단점도 있습니다 위치 :
예시 목적을 위해 제공 (*) 코드. 컴파일 또는 테스트하지 않습니다.
-
==============================
2.당신은 클라이언트를 직렬화하기 위해 노력하고 있습니다. 당신은 당신이 다른 클러스터 노드에서 실행됩니다지도 내부에 사용하려는 것으로, 하나 RedisClient, R 있습니다. 개별적으로 클러스터 작업을 수행하기 전에 레디 스 밖으로 원하는 데이터를 얻을, 또는 아마도 각각의 행에 대해 새로운 레디 스 클라이언트를 만드는 등, 오히려지도보다 mapPartitions를 사용하여 맵 블록 내부의 각 클러스터 작업에 대해 개별적으로 클라이언트를 (아마도 만들 나쁜 생각).
당신은 클라이언트를 직렬화하기 위해 노력하고 있습니다. 당신은 당신이 다른 클러스터 노드에서 실행됩니다지도 내부에 사용하려는 것으로, 하나 RedisClient, R 있습니다. 개별적으로 클러스터 작업을 수행하기 전에 레디 스 밖으로 원하는 데이터를 얻을, 또는 아마도 각각의 행에 대해 새로운 레디 스 클라이언트를 만드는 등, 오히려지도보다 mapPartitions를 사용하여 맵 블록 내부의 각 클러스터 작업에 대해 개별적으로 클라이언트를 (아마도 만들 나쁜 생각).
from https://stackoverflow.com/questions/28006517/redis-on-sparktask-not-serializable by cc-by-sa and MIT license
'REDIS' 카테고리의 다른 글
[REDIS] 레디 스에서 Dicts 목록 (0) | 2020.01.07 |
---|---|
[REDIS] 두 교차로 또는 그 이상의 분류 세트 (0) | 2020.01.07 |
[REDIS] Node.js를 루핑 비동기 처리를위한 가장 좋은 패턴 (0) | 2020.01.07 |
[REDIS] 레디 스 멀티 - 매개 변수 일치 찾기 (0) | 2020.01.06 |
[REDIS] 스크립트 전역 변수를 작성하려고 (0) | 2020.01.06 |