복붙노트

[HADOOP] mongodb에 연결하는 동안 예외가 발생 함

HADOOP

mongodb에 연결하는 동안 예외가 발생 함

MongoDB를 입력 RDD로 사용하려고 시도하는 중에 org.bson.BasicBSONDecoder._decode에서 "java.lang.IllegalStateException : not ready"가 발생합니다.

Configuration conf = new Configuration();
conf.set("mongo.input.uri", "mongodb://127.0.0.1:27017/test.input");

JavaPairRDD<Object, BSONObject> rdd = sc.newAPIHadoopRDD(conf, MongoInputFormat.class, Object.class, BSONObject.class);

System.out.println(rdd.count());

내가 얻는 예외는 : 14/08/06 09:49:57 정보 rdd.NewHadoopRDD : 입력 분할 :

MongoInputSplit{URI=mongodb://127.0.0.1:27017/test.input, authURI=null, min={ "_id" : { "$oid" : "53df98d7e4b0a67992b31f8d"}}, max={ "_id" : { "$oid" : "53df98d7e4b0a67992b331b8"}}, query={ }, sort={ }, fields={ }, notimeout=false} 14/08/06 09:49:57 
WARN scheduler.TaskSetManager: Loss was due to java.lang.IllegalStateException 
java.lang.IllegalStateException: not ready
            at org.bson.BasicBSONDecoder._decode(BasicBSONDecoder.java:139)
            at org.bson.BasicBSONDecoder.decode(BasicBSONDecoder.java:123)
            at com.mongodb.hadoop.input.MongoInputSplit.readFields(MongoInputSplit.java:185)
            at org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:285)
            at org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:77)
            at org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:42)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:88)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55)
            at java.lang.reflect.Method.invoke(Method.java:618)
            at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1089)
            at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1962)
            at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1867)
            at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1419)
            at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2059)
            at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1984)
            at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1867)
            at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1419)
            at java.io.ObjectInputStream.readObject(ObjectInputStream.java:420)
            at org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:147)
            at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1906)
            at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1865)
            at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1419)
            at java.io.ObjectInputStream.readObject(ObjectInputStream.java:420)
            at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
            at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:85)
            at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:165)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1156)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:626)
            at java.lang.Thread.run(Thread.java:804)

모든 프로그램 출력은 여기에 있습니다.

환경:

해결법

  1. ==============================

    1.mongodb-hadoop은 core / src / main / java / com / mongodb / hadoop / input / MongoInputSplit.java의 BSON 인코더 / 디코더 인스턴스에 "정적"수정자를 가지고 있습니다. Spark이 다중 스레드 모드에서 실행될 때 모든 스레드는 동일한 인코더 / 디코더 인스턴스를 사용하여 시도하고 디시 리얼 라이즈합니다. 결과적으로 나쁜 결과가 나옵니다.

    mongodb-hadoop은 core / src / main / java / com / mongodb / hadoop / input / MongoInputSplit.java의 BSON 인코더 / 디코더 인스턴스에 "정적"수정자를 가지고 있습니다. Spark이 다중 스레드 모드에서 실행될 때 모든 스레드는 동일한 인코더 / 디코더 인스턴스를 사용하여 시도하고 디시 리얼 라이즈합니다. 결과적으로 나쁜 결과가 나옵니다.

    여기 내 github에 패치 (업스트림 요청을 업스트림에 제출 함)

    이제 파이썬에서 8 코어 멀티 스레드 Spark-> mongo collection count ()를 실행할 수 있습니다!

  2. ==============================

    2.나는 같은 문제를 발견했다. 해결 방법으로 새로운 APIHadoopRDD 방법을 포기하고 문서 ID에 간격을 정의하고 각 파티션을 병렬로로드하여 병렬로드 메커니즘을 구현했습니다. 아이디어는 MongoDB Java 드라이버를 사용하여 다음의 mongo 쉘 코드를 구현하는 것입니다.

    나는 같은 문제를 발견했다. 해결 방법으로 새로운 APIHadoopRDD 방법을 포기하고 문서 ID에 간격을 정의하고 각 파티션을 병렬로로드하여 병렬로드 메커니즘을 구현했습니다. 아이디어는 MongoDB Java 드라이버를 사용하여 다음의 mongo 쉘 코드를 구현하는 것입니다.

    // Compute min and max id of the collection
    db.coll.find({},{_id:1}).sort({_id: 1}).limit(1)
       .forEach(function(doc) {min_id = doc._id})
    db.coll.find({},{_id:1}).sort({_id: -1}).limit(1)
       .forEach(function(doc) {max_id = doc._id})
    
    // Compute id ranges
    curr_id = min_id
    ranges = []
    page_size = 1000
    // to avoid the use of Comparable in the Java translation
    while(! curr_id.equals(max_id)) {
        prev_id = curr_id    
        db.coll.find({_id : {$gte : curr_id}}, {_id : 1})
               .sort({_id: 1})
               .limit(page_size + 1)
               .forEach(function(doc) {
                           curr_id = doc._id
                       })
        ranges.push([prev_id, curr_id])
    }
    

    이제 범위를 사용하여 수집 단편에 대한 빠른 쿼리를 수행 할 수 있습니다. 콜렉션의 마지막 문서가 손실되는 것을 피하기 위해 마지막 프래그먼트는 min 제약 조건과 다르게 다뤄야합니다.

    db.coll.find({_id : {$gte : ranges[1][0], $lt : ranges[1][1]}})
    db.coll.find({_id : {$gte : ranges[2][0]}})
    

    간단하게 Range POJO에 대한 Java 메소드 'LinkedList computeIdRanges (DBCollection coll, int rangeSize)'를 구현 한 다음 컬렉션을 평행 화하고 flatMapToPair로 변환하여 newAPIHadoopRDD에서 반환되는 RDD와 유사한 RDD를 생성합니다.

    List<Range> ranges = computeIdRanges(coll, DEFAULT_RANGE_SIZE);
    JavaRDD<Range> parallelRanges = sparkContext.parallelize(ranges, ranges.size());
    JavaPairRDD<Object, BSONObject> mongoRDD = 
       parallelRanges.flatMapToPair(
         new PairFlatMapFunction<MongoDBLoader.Range, Object, BSONObject>() {
           ...
           BasicDBObject query = range.max.isPresent() ?
               new BasicDBObject("_id", new BasicDBObject("$gte", range.min)
                                .append("$lt", range.max.get()))
             : new BasicDBObject("_id", new BasicDBObject("$gte", range.min));
           ...
    

    범위의 크기 및 병렬화에 사용 된 슬라이스 수를 사용하여 병렬 처리의 세분성을 제어 할 수 있습니다.

    도움이되기를 바랍니다.

    인사말!

    후안 로드리게스 호르 탈라

  3. ==============================

    3.mongorestore를 사용하여 BSON 파일을 가져온 후 동일한 예외 조합이있었습니다. db.collecion.reIndex ()를 호출하면 문제가 해결되었습니다.

    mongorestore를 사용하여 BSON 파일을 가져온 후 동일한 예외 조합이있었습니다. db.collecion.reIndex ()를 호출하면 문제가 해결되었습니다.

  4. from https://stackoverflow.com/questions/25226515/exception-while-connecting-to-mongodb-in-spark by cc-by-sa and MIT license