복붙노트

[HADOOP] 점수를 업데이트하는 동안 너치 2.3.1지도-감소 시간이 초과되었습니다

HADOOP

점수를 업데이트하는 동안 너치 2.3.1지도-감소 시간이 초과되었습니다

나는 4 시스템 클러스터 및 너치 2.3.1 몇 웹 사이트를 크롤링하도록 구성되어 있습니다. 크롤링 후, 나는 일부 사용자 지정 작업의 득점이 조금 큰 변경해야합니다. 작업에서 매퍼는 키와 도메인을 기반으로 문서를 결합한다. 감속기는 동안, 나는 그들의 효과적인 텍스트 바이트 합계 및 평균을 찾을 수 있습니다. 나중에 나는 점수로 평균 바이트의 로그를 할당합니다. 그러나 감속기 작업 14 시간이 걸렸다 다음 시간 초과가 발생했습니다. Nutch 내장 작업, 예를 들어에있는 동안, updatedb에 3 ~ 4 시간으로 완성되고 있습니다. 어디에 문제입니다. 내 코드에 약간의 문제가 있습니다 (아래)

 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 * 
 *     http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 ******************************************************************************/
package org.apache.nutch.domain;

import java.io.IOException;
import java.util.ArrayList;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.gora.mapreduce.GoraReducer;
import org.apache.gora.store.DataStore;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.nutch.crawl.CrawlStatus;
import org.apache.nutch.crawl.DbUpdaterJob;
import org.apache.nutch.storage.StorageUtils;
import org.apache.nutch.storage.WebPage;
import org.apache.nutch.util.TableUtil;
import org.slf4j.Logger;

/**
 * Combines all WebPages with the same host key to create a Host object, with
 * some statistics.
 */
public class DomainAnalysisReducer extends
    GoraReducer<Text, WebPage, String, WebPage> {


    public static final Logger LOG = DomainAnalysisJob.LOG;
    public DataStore<String, WebPage> datastore;
    // Regex for image and feed URLs
    private static final String IMAGE_PATTERN = "(.*/)*.+\\.(png|jpg|gif|bmp|jpeg|json|PNG|JPG|GIF|BMP|svg|SVG|xml|XML|JSON).*";
    private static final String FEED_PATTERN = ".*/feed/?$";
    private static final float PENALITY_SCOER = -1.0f;          // min score for image or feed urls
    private static FilterURL urlFilter;
    protected static float q1_ur_threshold = 500.0f;
    protected static float q1_ur_docCount = 50;

      @Override
      protected void setup(Context context) throws IOException,
      InterruptedException {
        Configuration conf = context.getConfiguration();
        try {
          datastore = StorageUtils.createWebStore(conf, String.class, WebPage.class);
          urlFilter = new FilterURL(conf);
        }
        catch (ClassNotFoundException e) {
          throw new IOException(e);
        }
        q1_ur_threshold = conf.getFloat("domain.queue.threshold.bytes", 500.0f);
        q1_ur_docCount = conf.getInt("domain.queue.doc.count", 50);
        LOG.info("Conf updated: Queue-bytes-threshold = " + q1_ur_threshold + " Queue-doc-threshold: " + q1_ur_docCount);
      }

      @Override
      protected void cleanup(Context context) throws IOException, InterruptedException {
        datastore.close();
      }

  @Override
  protected void reduce(Text key, Iterable<WebPage> values, Context context)
      throws IOException, InterruptedException {

      ArrayList<String> Cache = new ArrayList<String>();

      int doc_counter = 0;
      int total_ur_bytes = 0;

    for ( WebPage page : values ) {

        // cache
        String orig_key = page.getMarkers().get( DomainAnalysisJob.URL_ORIG_KEY ).toString();
        Cache.add(orig_key);

        // do not consider those doc's that are not fetched or link URLs
        if ( page.getStatus() == CrawlStatus.STATUS_UNFETCHED ) {
         continue;
        }

        doc_counter++;
        int ur_score_int = 0;
        int doc_ur_bytes = 0;
        int doc_total_bytes = 0;
        String ur_score_str = "0";
        String langInfo_str = null;

        // read page and find its Urdu score
        langInfo_str = TableUtil.toString(page.getLangInfo());      
        if (langInfo_str == null) {
            continue;
        }
        ur_score_str = TableUtil.toString(page.getUrduScore());
        ur_score_int = Integer.parseInt(ur_score_str);
        doc_total_bytes = Integer.parseInt( langInfo_str.split("&")[0] );
        doc_ur_bytes = ( doc_total_bytes * ur_score_int) / 100;             //Formula to find ur percentage

        total_ur_bytes += doc_ur_bytes;     

    }
    float avg_bytes = 0;
    float log10 = 0;
    if ( doc_counter > 0 && total_ur_bytes > 0) {
        avg_bytes = (float) total_ur_bytes/doc_counter;
         log10 = (float) Math.log10(avg_bytes);
         log10 = (Math.round(log10 * 100000f)/100000f);
    }

    context.getCounter("DomainAnalysis", "DomainCount").increment(1);
    // if average bytes and doc count, are more than threshold then mark as q1
    boolean mark = false;
    if ( avg_bytes >= q1_ur_threshold && doc_counter >= q1_ur_docCount ) {
        mark = true;
        LOG.info("Domain transposition: " + key.toString() + " total-docs: " + Cache.size()+ " fetched-docs: " + doc_counter +
                " avg.-Urdue-bytes: " + avg_bytes + " score: " + log10);
    } 
    // we can do the reverse i.e., move to q2 if less than threshold 

    for ( int index = 0; index < Cache.size(); index++) {

        String Orig_key = Cache.get(index);
        float doc_score = log10;

        //penalize urls if they are feed or images
        boolean isRelevant = ( ValidateImageURL(Orig_key) || ValidateFeedURL(Orig_key));
        if (isRelevant) {
            doc_score = PENALITY_SCOER;
        }
        // if blacklisted then apply penalty
        if ( urlFilter.IsPageBlackListed( TableUtil.unreverseUrl(Orig_key) )) {
            doc_score = PENALITY_SCOER;
        }

        WebPage page = datastore.get(Orig_key);
        page.setScore(doc_score);

        if (mark) {
            page.getMarkers().put( DbUpdaterJob.Queue, DbUpdaterJob.Q1);
        }


        context.write(Orig_key, page);
    }
  }
  public static boolean ValidateImageURL(String url) {
        Pattern pattern = Pattern.compile(IMAGE_PATTERN);
        Matcher matcher = pattern.matcher(url);
        return matcher.matches();

    }
    public static boolean ValidateFeedURL(String url) {
        Pattern pattern = Pattern.compile(FEED_PATTERN);
        Matcher matcher = pattern.matcher(url);
        return matcher.matches();
    }
}

드라이버 클래스의 코드

public void updateDomains(boolean buildLinkDb, int numTasks) throws Exception {


    NutchJob job = NutchJob.getInstance(getConf(), "rankDomain-update");

    job.getConfiguration().setInt("mapreduce.task.timeout", 1800000);



    if ( numTasks < 1) {
        job.setNumReduceTasks(job.getConfiguration().getInt(
            "mapred.map.tasks", job.getNumReduceTasks()));
      } else {
        job.setNumReduceTasks(numTasks);
      }

    // === Map ===
    DataStore<String, WebPage> pageStore = StorageUtils.createWebStore(
        job.getConfiguration(), String.class, WebPage.class);
    Query<String, WebPage> query = pageStore.newQuery();
    query.setFields(StorageUtils.toStringArray(FIELDS)); // Note: pages without
                                                         // these fields are
                                                         // skipped
    GoraMapper.initMapperJob(job, query, pageStore, Text.class, WebPage.class,
        DomainAnalysisJob.Mapper.class, null, true);

    // === Reduce ===
    DataStore<String, WebPage> hostStore = StorageUtils.createWebStore(
        job.getConfiguration(), String.class, WebPage.class);
    GoraReducer.initReducerJob(job, hostStore, DomainAnalysisReducer.class);


    job.waitForCompletion(true);
  }

그리고 감속기 실패 로그를 한 다음

attempt_1549963404554_0102_r_000000_0   FAILED      /default-rack/node3:8042    logs    Tue Feb 19 15:53:23 +0500 2019  Tue Feb 19 20:45:11 +0500 2019  Tue Feb 19 20:45:11 +0500 2019  Tue Feb 19 20:45:11 +0500 2019  4hrs, 51mins, 47sec 0sec    0sec    4hrs, 51mins, 47sec AttemptID:attempt_1549963404554_0102_r_000000_0 Timed out after 1800 secs
attempt_1549963404554_0102_r_000000_1   FAILED      /default-rack/node3:8042    logs    Tue Feb 19 20:45:13 +0500 2019  Wed Feb 20 00:55:11 +0500 2019  Wed Feb 20 00:55:11 +0500 2019  Wed Feb 20 00:55:11 +0500 2019  4hrs, 9mins, 57sec  0sec    0sec    4hrs, 9mins, 57sec  AttemptID:attempt_1549963404554_0102_r_000000_1 Timed out after 1800 secs
attempt_1549963404554_0102_r_000000_2   FAILED      /default-rack/node3:8042    logs    Wed Feb 20 00:55:13 +0500 2019  Wed Feb 20 05:16:11 +0500 2019  Wed Feb 20 05:16:11 +0500 2019  Wed Feb 20 05:16:11 +0500 2019  4hrs, 20mins, 57sec 0sec    0sec    4hrs, 20mins, 57sec AttemptID:attempt_1549963404554_0102_r_000000_2 Timed out after 1800 secs

해결법

    from https://stackoverflow.com/questions/54778616/apache-nutch-2-3-1-map-reduce-timeout-occurred-while-updating-the-score by cc-by-sa and MIT license