[HADOOP] mongodb와 apache-spark를 연결할 때 쿼리하는 방법
HADOOPmongodb와 apache-spark를 연결할 때 쿼리하는 방법
나는 지금 Spark와 Mongodb를 실험하고 있는데, mongodb-hadoop connector를 사용하여 spark와 mongodb 통신을 연결한다. 다음은 https://github.com/plaa/mongo-spark의 예입니다.이 예제는 저에게 적합합니다.
그런 다음이 예제를 기반으로 6 백만 건의 비행 데이터 레코드가있는 https://github.com/10gen-interns/big-data-exploration의 더 큰 데이터 세트를 사용했습니다. mongodb 데이터 집합을 쿼리 한 다음 추가 처리를 수행하고 싶습니다.
항공편 데이터의 스키마는 https://gist.github.com/sweetieSong/6016700에 있습니다.
데이터 예제보기 :
{ "_id" : ObjectId( "51bf19c4ca69141e42ddd1f7" ),
"age" : 27,
"airTime" : 316,
"airlineId" : 19805,
"arrDelay" : -37,
"arrTime" : Date( 1336304580000 ),
"carrier" : "AA",
"carrierId" : "AA",
"crsArrTime" : Date( 1336306800000 ),
"crsDepTime" : Date( 1336294800000 ),
"crsElapsedTime" : 380,
"date" : Date( 1336262400000 ),
"dayOfMonth" : 6,
"dayOfWeek" : 7,
"depDelay" : -5,
"depTime" : Date( 1336294500000 ),
"destAirport" : "LAX",
"destAirportId" : 12892,
"destCity" : "Los Angeles, CA",
"destCityId" : 32575,
"destState" : "California",
"destStateId" : "CA",
"destWAC" : 91,
"distance" : 2475,
"diverted" : true,
"elapsedTime" : 348,
"flightNum" : 1,
"month" : 5,
"numDivAirportLandings" : 0,
"numFlights" : 1,
"origAirport" : "JFK",
"origAirportId" : 12478,
"origCity" : "New York, NY",
"origCityId" : 31703,
"origState" : "New York",
"origStateId" : "NY",
"origWAC" : 22,
"quarter" : 2,
"tailNum" : "N323AA",
"taxiIn" : 19,
"taxiOut" : 13,
"wheelsOff" : Date( 1336295280000 ),
"wheelsOn" : Date( 1336303440000 ),
"year" : 2012 }
내 스칼라 코드는
val sc = new SparkContext("local", "Scala Word Count")
val config = new Configuration()
config.set("mongo.input.uri", "mongodb://xx.xx.xx.xx:27017/flying.flights")
config.set("mongo.input.query","{destAirport: 'LAX'}");
//config.set("mongo.input.query","{_id.destAirport: 'LAX'}");
val mongoRDD = sc.newAPIHadoopRDD(config, classOf[com.mongodb.hadoop.MongoInputFormat], classOf[Object], classOf[BSONObject])
println ( "우리는 scala..count를 실행 중", mongoRDD.count ())
테스트 목적으로, destAirport 'LAX'에서 모든 레코드를 먼저 가져오고 싶습니다. 어떻게 쿼리가 유사한 지 알 수 없으므로 "{destAirport : 'LAX'}"및 "{destAirport : _id.destAirport : 'LAX'} "
응용 프로그램을 실행할 때 콘솔은 이러한 정보를 출력합니다
정보 MongoCollectionSplitter : 생성 된 분할 : min = { "_id": { "$ oid": "51bf29d8ca69141e42097d7f"}}, 최대 = { "_id": { "$ oid": "51bf29dfca69141e420991ad"}}
14/08/05 10:30:51 INFO Executor: Running task ID 751
14/08/05 10:30:51 INFO TaskSetManager: Finished TID 750 in 109 ms on localhost (progress: 751/1192)
14/08/05 10:30:51 INFO DAGScheduler: Completed ResultTask(0, 750)
14/08/05 10:30:51 INFO BlockManager: Found block broadcast_0 locally
14/08/05 10:30:51 INFO NewHadoopRDD: Input split: MongoInputSplit{URI=mongodb://178.62.35.36:27017/flying.flights, authURI=null, min={ "_id" : { "$oid" : "51bf2f95ca69141e421904e5"}}, max={ "_id" : { "$oid" : "51bf2f9dca69141e42191913"}}, query={ "_id.destAirport" : "LAX "}, sort={ }, fields={ }, notimeout=false}
14/08/05 10:30:51 INFO MongoRecordReader: Read 0.0 documents from:
14/08/05 10:30:51 INFO MongoRecordReader: MongoInputSplit{URI=mongodb://178.62.35.36:27017/flying.flights, authURI=null, min={ "_id" : { "$oid" : "51bf2f95ca69141e421904e5"}}, max={ "_id" : { "$oid" : "51bf2f9dca69141e42191913"}}, query={ "_id.destAirport" : "LAX "}, sort={ }, fields={ }, notimeout=false}
14/08/05 10:30:51 INFO Executor: Serialized size of result for 751 is 597
14/08/05 10:30:51 INFO Executor: Sending result for 751 directly to driver
14/08/05 10:30:51 INFO Executor: Finished task ID 751
쿼리가 무엇이든 상관없이 (심지어 쿼리를 설정하지 않아도) 스파크는 항상 1191 개의 작업을 실행합니다. 각 작업은 유사한 단어를 출력합니다. mongoRDD.count ()는 항상 0을 출력합니다.
내 첫 번째 질문은 올바른 쿼리가 무엇입니까?
게다가 이전에 mongodb-hadoop이하는 일은 mongodb가 먼저 모든 콜렉션을 질의 한 다음 결과를 다시 보내서 처리 할 수 있도록하는 것이라고 생각했습니다. 하지만 이제는 mongodb이 컬렉션을 여러 개로 나눠서 컬렉션의 작은 부분을 쿼리 한 다음 해당 부분의 결과를 시작으로 보냅니다. 그렇지?
해결법
-
==============================
1."올바른"쿼리가 없다고 생각합니다. 처리하고자하는 데이터를 기반으로 쿼리해야합니다.
"올바른"쿼리가 없다고 생각합니다. 처리하고자하는 데이터를 기반으로 쿼리해야합니다.
같은 문제가 발생했습니다.
MongoInputSplit.class가 주어지면 newAPIHadoopRDD는 분할을 계산할 때 쿼리를 고려하지 않는다고 생각합니다. 분할이 계산 된 후에 만 적용됩니다. 즉, 쿼리가 얼마나 가볍고 상관없이 분할 수는 동일하게 유지되며 컬렉션의 크기에 비례합니다.
newAPIHadoopRDD가 StandaloneMongoSplitter를 사용 중입니다. 이 클래스는 쿼리를 사용하여 분할 경계를 계산하지 않습니다. mongo의 내부 "splitVector"명령을 사용하고 있습니다. http://api.mongodb.org/internal/current/commands.html의 문서에서 쿼리를 고려하지 않은 것처럼 보입니다.
나는 좋은 해결책이 없다. 더 나은 방법은 쿼리를 계산 한 후에 mongo 컬렉션을 분할하는 것이지만,이 경우에는 스플리터를 구현해야합니다. 다음은이 문제에 대한 좋은 읽을 거리입니다. http://www.ikanow.com/how-well-does-mongodb-integrate-with-hadoop/
from https://stackoverflow.com/questions/25203325/how-to-query-when-connecting-mongodb-with-apache-spark by cc-by-sa and MIT license
'HADOOP' 카테고리의 다른 글
[HADOOP] 설치시 하이브가 작동하지 않습니다. (0) | 2019.07.26 |
---|---|
[HADOOP] $ HADOOP_HOME은 (는) 사용되지 않으며 Hadoop입니다. (0) | 2019.07.26 |
[HADOOP] 상태 1로 종료 된 Sqoop 하이브 (0) | 2019.07.26 |
[HADOOP] Hive에서 OpenCSVSerde를 사용할 때 모든 열이 문자열로 생성되는 이유는 무엇입니까? (0) | 2019.07.26 |
[HADOOP] YARN UNHYALTHY 노드 (0) | 2019.07.26 |