[SCALA] 스파크 : DB 점화 RDD 파티션 당 연결 및 수행 mapPartition
SCALA스파크 : DB 점화 RDD 파티션 당 연결 및 수행 mapPartition
나는 나의 불꽃 RDD에 mapPartitions을 수행 할
val newRd = myRdd.mapPartitions(
partition => {
val connection = new DbConnection /*creates a db connection per partition*/
val newPartition = partition.map(
record => {
readMatchingFromDB(record, connection)
})
connection.close()
newPartition
})
그러나,이 컨트롤이 .MAP에 도달하기 전에하기 때문에 예상 () 내 연결이 종료 될 때, 이미 나에게 폐쇄 예외를 연결을 제공합니다. 나는 RDD 파티션 당 연결을 만들고, 그것을 제대로 닫으려면. 이걸 어떻게 달성 할 수 있습니까?
감사!
해결법
-
==============================
1.여기 토론에서 언급 한 바와 같이 - 문제는 반복자 파티션에지도 작업의 게으름에서 유래한다. (RDD이 작용 될 때), 각 파티션의 접속은 뒤쪽 닫힐 생성되고 있음이 게으름 수단, readMatchingFromDB 호출된다.
여기 토론에서 언급 한 바와 같이 - 문제는 반복자 파티션에지도 작업의 게으름에서 유래한다. (RDD이 작용 될 때), 각 파티션의 접속은 뒤쪽 닫힐 생성되고 있음이 게으름 수단, readMatchingFromDB 호출된다.
이 문제를 해결하려면, 당신은 예를 들어, 연결을 닫기 전에 반복자의 열망 통과를 강제해야한다 (그때 등)리스트로 변환하여 :
val newRd = myRdd.mapPartitions(partition => { val connection = new DbConnection /*creates a db connection per partition*/ val newPartition = partition.map(record => { readMatchingFromDB(record, connection) }).toList // consumes the iterator, thus calls readMatchingFromDB connection.close() newPartition.iterator // create a new iterator })
-
==============================
2.
rdd.foreachPartitionAsync(iterator->{ // this object will be cached inside each executor JVM. For the first time, the //connection will be created and hence forward, it will be reused. // Very useful for streaming apps DBConn conn=DBConn.getConnection() while(iterator.hasNext()) { conn.read(); } }); public class DBConn{ private static dbObj=null; //Create a singleton method that returns only one instance of this object } }
from https://stackoverflow.com/questions/37881042/spark-db-connection-per-spark-rdd-partition-and-do-mappartition by cc-by-sa and MIT license
'SCALA' 카테고리의 다른 글
[SCALA] 스칼라 - 하나의 목록으로 목록의 변환 목록 : 목록 [목록 [A] 목록 [A]에 (0) | 2019.11.22 |
---|---|
[SCALA] 어떻게 스칼라에서 테스트 리소스에 액세스하려면? (0) | 2019.11.22 |
[SCALA] 스칼라에서 병렬 파일 처리 (0) | 2019.11.22 |
[SCALA] 어떻게 기능의 요소의 공변 및 contravariant 위치를 확인하려면? (0) | 2019.11.22 |
[SCALA] 스칼라 SBT와 기업의 프록시 - SunCertPathBuilderException (0) | 2019.11.22 |