복붙노트

[HADOOP] mongodb와 apache-spark를 연결할 때 쿼리하는 방법

HADOOP

mongodb와 apache-spark를 연결할 때 쿼리하는 방법

나는 지금 Spark와 Mongodb를 실험하고 있는데, mongodb-hadoop connector를 사용하여 spark와 mongodb 통신을 연결한다. 다음은 https://github.com/plaa/mongo-spark의 예입니다.이 예제는 저에게 적합합니다.

그런 다음이 예제를 기반으로 6 백만 건의 비행 데이터 레코드가있는 https://github.com/10gen-interns/big-data-exploration의 더 큰 데이터 세트를 사용했습니다. mongodb 데이터 집합을 쿼리 한 다음 추가 처리를 수행하고 싶습니다.

항공편 데이터의 스키마는 https://gist.github.com/sweetieSong/6016700에 있습니다.

데이터 예제보기 :

{ "_id" : ObjectId( "51bf19c4ca69141e42ddd1f7" ),
  "age" : 27,
  "airTime" : 316,
  "airlineId" : 19805,
  "arrDelay" : -37,
  "arrTime" : Date( 1336304580000 ),
  "carrier" : "AA",
  "carrierId" : "AA",
  "crsArrTime" : Date( 1336306800000 ),
  "crsDepTime" : Date( 1336294800000 ),
  "crsElapsedTime" : 380,
  "date" : Date( 1336262400000 ),
  "dayOfMonth" : 6,
  "dayOfWeek" : 7,
  "depDelay" : -5,
  "depTime" : Date( 1336294500000 ),
  "destAirport" : "LAX",
  "destAirportId" : 12892,
  "destCity" : "Los Angeles, CA",
  "destCityId" : 32575,
  "destState" : "California",
  "destStateId" : "CA",
  "destWAC" : 91,
  "distance" : 2475,
  "diverted" : true,
  "elapsedTime" : 348,
  "flightNum" : 1,
  "month" : 5,
  "numDivAirportLandings" : 0,
  "numFlights" : 1,
  "origAirport" : "JFK",
  "origAirportId" : 12478,
  "origCity" : "New York, NY",
  "origCityId" : 31703,
  "origState" : "New York",
  "origStateId" : "NY",
  "origWAC" : 22,
  "quarter" : 2,
  "tailNum" : "N323AA",
  "taxiIn" : 19,
  "taxiOut" : 13,
  "wheelsOff" : Date( 1336295280000 ),
  "wheelsOn" : Date( 1336303440000 ),
  "year" : 2012 }

내 스칼라 코드는

val sc = new SparkContext("local", "Scala Word Count")

val config = new Configuration()
config.set("mongo.input.uri", "mongodb://xx.xx.xx.xx:27017/flying.flights")
config.set("mongo.input.query","{destAirport: 'LAX'}");
//config.set("mongo.input.query","{_id.destAirport: 'LAX'}");

val mongoRDD = sc.newAPIHadoopRDD(config, classOf[com.mongodb.hadoop.MongoInputFormat], classOf[Object], classOf[BSONObject])

println ( "우리는 scala..count를 실행 중", mongoRDD.count ())

테스트 목적으로, destAirport 'LAX'에서 모든 레코드를 먼저 가져오고 싶습니다. 어떻게 쿼리가 유사한 지 알 수 없으므로 "{destAirport : 'LAX'}"및 "{destAirport : _id.destAirport : 'LAX'} "

응용 프로그램을 실행할 때 콘솔은 이러한 정보를 출력합니다

정보 MongoCollectionSplitter : 생성 된 분할 : min = { "_id": { "$ oid": "51bf29d8ca69141e42097d7f"}}, 최대 = { "_id": { "$ oid": "51bf29dfca69141e420991ad"}}

14/08/05 10:30:51 INFO Executor: Running task ID 751
14/08/05 10:30:51 INFO TaskSetManager: Finished TID 750 in 109 ms on localhost (progress: 751/1192)
14/08/05 10:30:51 INFO DAGScheduler: Completed ResultTask(0, 750)
14/08/05 10:30:51 INFO BlockManager: Found block broadcast_0 locally
14/08/05 10:30:51 INFO NewHadoopRDD: Input split: MongoInputSplit{URI=mongodb://178.62.35.36:27017/flying.flights, authURI=null, min={ "_id" : { "$oid" : "51bf2f95ca69141e421904e5"}}, max={ "_id" : { "$oid" : "51bf2f9dca69141e42191913"}}, query={ "_id.destAirport" : "LAX "}, sort={ }, fields={ }, notimeout=false}
14/08/05 10:30:51 INFO MongoRecordReader: Read 0.0 documents from:
14/08/05 10:30:51 INFO MongoRecordReader: MongoInputSplit{URI=mongodb://178.62.35.36:27017/flying.flights, authURI=null, min={ "_id" : { "$oid" : "51bf2f95ca69141e421904e5"}}, max={ "_id" : { "$oid" : "51bf2f9dca69141e42191913"}}, query={ "_id.destAirport" : "LAX "}, sort={ }, fields={ }, notimeout=false}
14/08/05 10:30:51 INFO Executor: Serialized size of result for 751 is 597
14/08/05 10:30:51 INFO Executor: Sending result for 751 directly to driver
14/08/05 10:30:51 INFO Executor: Finished task ID 751

쿼리가 무엇이든 상관없이 (심지어 쿼리를 설정하지 않아도) 스파크는 항상 1191 개의 작업을 실행합니다. 각 작업은 유사한 단어를 출력합니다. mongoRDD.count ()는 항상 0을 출력합니다.

내 첫 번째 질문은 올바른 쿼리가 무엇입니까?

게다가 이전에 mongodb-hadoop이하는 일은 mongodb가 먼저 모든 콜렉션을 질의 한 다음 결과를 다시 보내서 처리 할 수 ​​있도록하는 것이라고 생각했습니다. 하지만 이제는 mongodb이 컬렉션을 여러 개로 나눠서 컬렉션의 작은 부분을 쿼리 한 다음 해당 부분의 결과를 시작으로 보냅니다. 그렇지?

해결법

  1. ==============================

    1."올바른"쿼리가 없다고 생각합니다. 처리하고자하는 데이터를 기반으로 쿼리해야합니다.

    "올바른"쿼리가 없다고 생각합니다. 처리하고자하는 데이터를 기반으로 쿼리해야합니다.

    같은 문제가 발생했습니다.

    MongoInputSplit.class가 주어지면 newAPIHadoopRDD는 분할을 계산할 때 쿼리를 고려하지 않는다고 생각합니다. 분할이 계산 된 후에 만 ​​적용됩니다. 즉, 쿼리가 얼마나 가볍고 상관없이 분할 수는 동일하게 유지되며 컬렉션의 크기에 비례합니다.

    newAPIHadoopRDD가 StandaloneMongoSplitter를 사용 중입니다. 이 클래스는 쿼리를 사용하여 분할 경계를 계산하지 않습니다. mongo의 내부 "splitVector"명령을 사용하고 있습니다. http://api.mongodb.org/internal/current/commands.html의 문서에서 쿼리를 고려하지 않은 것처럼 보입니다.

    나는 좋은 해결책이 없다. 더 나은 방법은 쿼리를 계산 한 후에 mongo 컬렉션을 분할하는 것이지만,이 경우에는 스플리터를 구현해야합니다. 다음은이 문제에 대한 좋은 읽을 거리입니다. http://www.ikanow.com/how-well-does-mongodb-integrate-with-hadoop/

  2. from https://stackoverflow.com/questions/25203325/how-to-query-when-connecting-mongodb-with-apache-spark by cc-by-sa and MIT license