[HADOOP] mongodb에 연결하는 동안 예외가 발생 함
HADOOPmongodb에 연결하는 동안 예외가 발생 함
MongoDB를 입력 RDD로 사용하려고 시도하는 중에 org.bson.BasicBSONDecoder._decode에서 "java.lang.IllegalStateException : not ready"가 발생합니다.
Configuration conf = new Configuration();
conf.set("mongo.input.uri", "mongodb://127.0.0.1:27017/test.input");
JavaPairRDD<Object, BSONObject> rdd = sc.newAPIHadoopRDD(conf, MongoInputFormat.class, Object.class, BSONObject.class);
System.out.println(rdd.count());
내가 얻는 예외는 : 14/08/06 09:49:57 정보 rdd.NewHadoopRDD : 입력 분할 :
MongoInputSplit{URI=mongodb://127.0.0.1:27017/test.input, authURI=null, min={ "_id" : { "$oid" : "53df98d7e4b0a67992b31f8d"}}, max={ "_id" : { "$oid" : "53df98d7e4b0a67992b331b8"}}, query={ }, sort={ }, fields={ }, notimeout=false} 14/08/06 09:49:57
WARN scheduler.TaskSetManager: Loss was due to java.lang.IllegalStateException
java.lang.IllegalStateException: not ready
at org.bson.BasicBSONDecoder._decode(BasicBSONDecoder.java:139)
at org.bson.BasicBSONDecoder.decode(BasicBSONDecoder.java:123)
at com.mongodb.hadoop.input.MongoInputSplit.readFields(MongoInputSplit.java:185)
at org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:285)
at org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:77)
at org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:42)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:88)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55)
at java.lang.reflect.Method.invoke(Method.java:618)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1089)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1962)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1867)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1419)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2059)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1984)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1867)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1419)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:420)
at org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:147)
at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1906)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1865)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1419)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:420)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:165)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1156)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:626)
at java.lang.Thread.run(Thread.java:804)
모든 프로그램 출력은 여기에 있습니다.
환경:
해결법
-
==============================
1.mongodb-hadoop은 core / src / main / java / com / mongodb / hadoop / input / MongoInputSplit.java의 BSON 인코더 / 디코더 인스턴스에 "정적"수정자를 가지고 있습니다. Spark이 다중 스레드 모드에서 실행될 때 모든 스레드는 동일한 인코더 / 디코더 인스턴스를 사용하여 시도하고 디시 리얼 라이즈합니다. 결과적으로 나쁜 결과가 나옵니다.
mongodb-hadoop은 core / src / main / java / com / mongodb / hadoop / input / MongoInputSplit.java의 BSON 인코더 / 디코더 인스턴스에 "정적"수정자를 가지고 있습니다. Spark이 다중 스레드 모드에서 실행될 때 모든 스레드는 동일한 인코더 / 디코더 인스턴스를 사용하여 시도하고 디시 리얼 라이즈합니다. 결과적으로 나쁜 결과가 나옵니다.
여기 내 github에 패치 (업스트림 요청을 업스트림에 제출 함)
이제 파이썬에서 8 코어 멀티 스레드 Spark-> mongo collection count ()를 실행할 수 있습니다!
-
==============================
2.나는 같은 문제를 발견했다. 해결 방법으로 새로운 APIHadoopRDD 방법을 포기하고 문서 ID에 간격을 정의하고 각 파티션을 병렬로로드하여 병렬로드 메커니즘을 구현했습니다. 아이디어는 MongoDB Java 드라이버를 사용하여 다음의 mongo 쉘 코드를 구현하는 것입니다.
나는 같은 문제를 발견했다. 해결 방법으로 새로운 APIHadoopRDD 방법을 포기하고 문서 ID에 간격을 정의하고 각 파티션을 병렬로로드하여 병렬로드 메커니즘을 구현했습니다. 아이디어는 MongoDB Java 드라이버를 사용하여 다음의 mongo 쉘 코드를 구현하는 것입니다.
// Compute min and max id of the collection db.coll.find({},{_id:1}).sort({_id: 1}).limit(1) .forEach(function(doc) {min_id = doc._id}) db.coll.find({},{_id:1}).sort({_id: -1}).limit(1) .forEach(function(doc) {max_id = doc._id}) // Compute id ranges curr_id = min_id ranges = [] page_size = 1000 // to avoid the use of Comparable in the Java translation while(! curr_id.equals(max_id)) { prev_id = curr_id db.coll.find({_id : {$gte : curr_id}}, {_id : 1}) .sort({_id: 1}) .limit(page_size + 1) .forEach(function(doc) { curr_id = doc._id }) ranges.push([prev_id, curr_id]) }
이제 범위를 사용하여 수집 단편에 대한 빠른 쿼리를 수행 할 수 있습니다. 콜렉션의 마지막 문서가 손실되는 것을 피하기 위해 마지막 프래그먼트는 min 제약 조건과 다르게 다뤄야합니다.
db.coll.find({_id : {$gte : ranges[1][0], $lt : ranges[1][1]}}) db.coll.find({_id : {$gte : ranges[2][0]}})
간단하게 Range POJO에 대한 Java 메소드 'LinkedList computeIdRanges (DBCollection coll, int rangeSize)'를 구현 한 다음 컬렉션을 평행 화하고 flatMapToPair로 변환하여 newAPIHadoopRDD에서 반환되는 RDD와 유사한 RDD를 생성합니다.
List<Range> ranges = computeIdRanges(coll, DEFAULT_RANGE_SIZE); JavaRDD<Range> parallelRanges = sparkContext.parallelize(ranges, ranges.size()); JavaPairRDD<Object, BSONObject> mongoRDD = parallelRanges.flatMapToPair( new PairFlatMapFunction<MongoDBLoader.Range, Object, BSONObject>() { ... BasicDBObject query = range.max.isPresent() ? new BasicDBObject("_id", new BasicDBObject("$gte", range.min) .append("$lt", range.max.get())) : new BasicDBObject("_id", new BasicDBObject("$gte", range.min)); ...
범위의 크기 및 병렬화에 사용 된 슬라이스 수를 사용하여 병렬 처리의 세분성을 제어 할 수 있습니다.
도움이되기를 바랍니다.
인사말!
후안 로드리게스 호르 탈라
-
==============================
3.mongorestore를 사용하여 BSON 파일을 가져온 후 동일한 예외 조합이있었습니다. db.collecion.reIndex ()를 호출하면 문제가 해결되었습니다.
mongorestore를 사용하여 BSON 파일을 가져온 후 동일한 예외 조합이있었습니다. db.collecion.reIndex ()를 호출하면 문제가 해결되었습니다.
from https://stackoverflow.com/questions/25226515/exception-while-connecting-to-mongodb-in-spark by cc-by-sa and MIT license
'HADOOP' 카테고리의 다른 글
[HADOOP] AWR EMR의 YARN 로그 집계 - UnsupportedFileSystemException (0) | 2019.07.27 |
---|---|
[HADOOP] Hadoop 파일 단위 블록 크기 (0) | 2019.07.27 |
[HADOOP] 하이브 메타 스토어 연결을위한 Hive-Site.xml 파일의 구성 설정 방법 (0) | 2019.07.27 |
[HADOOP] hadoop : 파일에 로컬 작업이 없습니까? (0) | 2019.07.27 |
[HADOOP] Hadoop 0.20.2 Eclipse 플러그인이 제대로 작동하지 않습니다. 'Hadoop에서 실행'할 수 없습니다. (0) | 2019.07.27 |