[HADOOP] 점수를 업데이트하는 동안 너치 2.3.1지도-감소 시간이 초과되었습니다
HADOOP점수를 업데이트하는 동안 너치 2.3.1지도-감소 시간이 초과되었습니다
나는 4 시스템 클러스터 및 너치 2.3.1 몇 웹 사이트를 크롤링하도록 구성되어 있습니다. 크롤링 후, 나는 일부 사용자 지정 작업의 득점이 조금 큰 변경해야합니다. 작업에서 매퍼는 키와 도메인을 기반으로 문서를 결합한다. 감속기는 동안, 나는 그들의 효과적인 텍스트 바이트 합계 및 평균을 찾을 수 있습니다. 나중에 나는 점수로 평균 바이트의 로그를 할당합니다. 그러나 감속기 작업 14 시간이 걸렸다 다음 시간 초과가 발생했습니다. Nutch 내장 작업, 예를 들어에있는 동안, updatedb에 3 ~ 4 시간으로 완성되고 있습니다. 어디에 문제입니다. 내 코드에 약간의 문제가 있습니다 (아래)
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
******************************************************************************/
package org.apache.nutch.domain;
import java.io.IOException;
import java.util.ArrayList;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.gora.mapreduce.GoraReducer;
import org.apache.gora.store.DataStore;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.nutch.crawl.CrawlStatus;
import org.apache.nutch.crawl.DbUpdaterJob;
import org.apache.nutch.storage.StorageUtils;
import org.apache.nutch.storage.WebPage;
import org.apache.nutch.util.TableUtil;
import org.slf4j.Logger;
/**
* Combines all WebPages with the same host key to create a Host object, with
* some statistics.
*/
public class DomainAnalysisReducer extends
GoraReducer<Text, WebPage, String, WebPage> {
public static final Logger LOG = DomainAnalysisJob.LOG;
public DataStore<String, WebPage> datastore;
// Regex for image and feed URLs
private static final String IMAGE_PATTERN = "(.*/)*.+\\.(png|jpg|gif|bmp|jpeg|json|PNG|JPG|GIF|BMP|svg|SVG|xml|XML|JSON).*";
private static final String FEED_PATTERN = ".*/feed/?$";
private static final float PENALITY_SCOER = -1.0f; // min score for image or feed urls
private static FilterURL urlFilter;
protected static float q1_ur_threshold = 500.0f;
protected static float q1_ur_docCount = 50;
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
Configuration conf = context.getConfiguration();
try {
datastore = StorageUtils.createWebStore(conf, String.class, WebPage.class);
urlFilter = new FilterURL(conf);
}
catch (ClassNotFoundException e) {
throw new IOException(e);
}
q1_ur_threshold = conf.getFloat("domain.queue.threshold.bytes", 500.0f);
q1_ur_docCount = conf.getInt("domain.queue.doc.count", 50);
LOG.info("Conf updated: Queue-bytes-threshold = " + q1_ur_threshold + " Queue-doc-threshold: " + q1_ur_docCount);
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
datastore.close();
}
@Override
protected void reduce(Text key, Iterable<WebPage> values, Context context)
throws IOException, InterruptedException {
ArrayList<String> Cache = new ArrayList<String>();
int doc_counter = 0;
int total_ur_bytes = 0;
for ( WebPage page : values ) {
// cache
String orig_key = page.getMarkers().get( DomainAnalysisJob.URL_ORIG_KEY ).toString();
Cache.add(orig_key);
// do not consider those doc's that are not fetched or link URLs
if ( page.getStatus() == CrawlStatus.STATUS_UNFETCHED ) {
continue;
}
doc_counter++;
int ur_score_int = 0;
int doc_ur_bytes = 0;
int doc_total_bytes = 0;
String ur_score_str = "0";
String langInfo_str = null;
// read page and find its Urdu score
langInfo_str = TableUtil.toString(page.getLangInfo());
if (langInfo_str == null) {
continue;
}
ur_score_str = TableUtil.toString(page.getUrduScore());
ur_score_int = Integer.parseInt(ur_score_str);
doc_total_bytes = Integer.parseInt( langInfo_str.split("&")[0] );
doc_ur_bytes = ( doc_total_bytes * ur_score_int) / 100; //Formula to find ur percentage
total_ur_bytes += doc_ur_bytes;
}
float avg_bytes = 0;
float log10 = 0;
if ( doc_counter > 0 && total_ur_bytes > 0) {
avg_bytes = (float) total_ur_bytes/doc_counter;
log10 = (float) Math.log10(avg_bytes);
log10 = (Math.round(log10 * 100000f)/100000f);
}
context.getCounter("DomainAnalysis", "DomainCount").increment(1);
// if average bytes and doc count, are more than threshold then mark as q1
boolean mark = false;
if ( avg_bytes >= q1_ur_threshold && doc_counter >= q1_ur_docCount ) {
mark = true;
LOG.info("Domain transposition: " + key.toString() + " total-docs: " + Cache.size()+ " fetched-docs: " + doc_counter +
" avg.-Urdue-bytes: " + avg_bytes + " score: " + log10);
}
// we can do the reverse i.e., move to q2 if less than threshold
for ( int index = 0; index < Cache.size(); index++) {
String Orig_key = Cache.get(index);
float doc_score = log10;
//penalize urls if they are feed or images
boolean isRelevant = ( ValidateImageURL(Orig_key) || ValidateFeedURL(Orig_key));
if (isRelevant) {
doc_score = PENALITY_SCOER;
}
// if blacklisted then apply penalty
if ( urlFilter.IsPageBlackListed( TableUtil.unreverseUrl(Orig_key) )) {
doc_score = PENALITY_SCOER;
}
WebPage page = datastore.get(Orig_key);
page.setScore(doc_score);
if (mark) {
page.getMarkers().put( DbUpdaterJob.Queue, DbUpdaterJob.Q1);
}
context.write(Orig_key, page);
}
}
public static boolean ValidateImageURL(String url) {
Pattern pattern = Pattern.compile(IMAGE_PATTERN);
Matcher matcher = pattern.matcher(url);
return matcher.matches();
}
public static boolean ValidateFeedURL(String url) {
Pattern pattern = Pattern.compile(FEED_PATTERN);
Matcher matcher = pattern.matcher(url);
return matcher.matches();
}
}
드라이버 클래스의 코드
public void updateDomains(boolean buildLinkDb, int numTasks) throws Exception {
NutchJob job = NutchJob.getInstance(getConf(), "rankDomain-update");
job.getConfiguration().setInt("mapreduce.task.timeout", 1800000);
if ( numTasks < 1) {
job.setNumReduceTasks(job.getConfiguration().getInt(
"mapred.map.tasks", job.getNumReduceTasks()));
} else {
job.setNumReduceTasks(numTasks);
}
// === Map ===
DataStore<String, WebPage> pageStore = StorageUtils.createWebStore(
job.getConfiguration(), String.class, WebPage.class);
Query<String, WebPage> query = pageStore.newQuery();
query.setFields(StorageUtils.toStringArray(FIELDS)); // Note: pages without
// these fields are
// skipped
GoraMapper.initMapperJob(job, query, pageStore, Text.class, WebPage.class,
DomainAnalysisJob.Mapper.class, null, true);
// === Reduce ===
DataStore<String, WebPage> hostStore = StorageUtils.createWebStore(
job.getConfiguration(), String.class, WebPage.class);
GoraReducer.initReducerJob(job, hostStore, DomainAnalysisReducer.class);
job.waitForCompletion(true);
}
그리고 감속기 실패 로그를 한 다음
attempt_1549963404554_0102_r_000000_0 FAILED /default-rack/node3:8042 logs Tue Feb 19 15:53:23 +0500 2019 Tue Feb 19 20:45:11 +0500 2019 Tue Feb 19 20:45:11 +0500 2019 Tue Feb 19 20:45:11 +0500 2019 4hrs, 51mins, 47sec 0sec 0sec 4hrs, 51mins, 47sec AttemptID:attempt_1549963404554_0102_r_000000_0 Timed out after 1800 secs
attempt_1549963404554_0102_r_000000_1 FAILED /default-rack/node3:8042 logs Tue Feb 19 20:45:13 +0500 2019 Wed Feb 20 00:55:11 +0500 2019 Wed Feb 20 00:55:11 +0500 2019 Wed Feb 20 00:55:11 +0500 2019 4hrs, 9mins, 57sec 0sec 0sec 4hrs, 9mins, 57sec AttemptID:attempt_1549963404554_0102_r_000000_1 Timed out after 1800 secs
attempt_1549963404554_0102_r_000000_2 FAILED /default-rack/node3:8042 logs Wed Feb 20 00:55:13 +0500 2019 Wed Feb 20 05:16:11 +0500 2019 Wed Feb 20 05:16:11 +0500 2019 Wed Feb 20 05:16:11 +0500 2019 4hrs, 20mins, 57sec 0sec 0sec 4hrs, 20mins, 57sec AttemptID:attempt_1549963404554_0102_r_000000_2 Timed out after 1800 secs
해결법
from https://stackoverflow.com/questions/54778616/apache-nutch-2-3-1-map-reduce-timeout-occurred-while-updating-the-score by cc-by-sa and MIT license
'HADOOP' 카테고리의 다른 글
[HADOOP] JDBC 소스에서 데이터를 마이그레이션 할 때 어떻게 분할을 최적화? (0) | 2019.09.30 |
---|---|
[HADOOP] 어떻게 outputcollector 작품? (0) | 2019.09.30 |
[HADOOP] 완료 맵리 듀스 작업 -Taking 너무 오래 (0) | 2019.09.30 |
[HADOOP] 로그없이 하둡 NodeManager 종료 (0) | 2019.09.30 |
[HADOOP] 어떻게 오류 "파일 : / 사용자 / 하이브 / 창고 / 기록되어 있지 디렉토리 또는 하나를 만들 수 없습니다"해결할 수 있습니까? (0) | 2019.09.30 |