복붙노트

[HADOOP] 설정 및 분산 캐시에 접근 문제

HADOOP

설정 및 분산 캐시에 접근 문제

어떤 이유로 나는 새로운 API 작업 분산 캐시를 얻기위한 온라인 좋은 소스를 찾을 수 없습니다. 여기에 희망 누군가가 내가 잘못 무엇인지 설명 할 수 있습니다. 나의 현재 시도는 내가 온라인으로 발견 한 여러 가지의 뒤범벅의 일종이다.

이 프로그램은 K-가장 가까운 이웃 알고리즘을 실행하려고 시도합니다. 분산 캐시 기차 세트 기차 레이블을 보유하는 동안 입력 파일은, 테스트 데이터 세트입니다. 맵퍼는 테스트 데이터의 한 행을 분산 캐시 데이터의 모든 행에 비교하고 가장 유사한 행의 레이블을 반환해야합니다.

import java.net.URI;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class KNNDriver extends Configured implements Tool {
    public int run(String[] args) throws Exception {
        if (args.length != 2) {
            System.out.printf("Usage: %s [generic options] <input dir> <output dir>\n", getClass().getSimpleName());
            return -1;
        }

        Configuration conf = new Configuration();
        // conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", "^");

        conf.setInt ("train_rows",1000);
        conf.setInt ("test_rows",1000);
        conf.setInt ("cols",612);
        DistributedCache.addCacheFile(new URI("cacheData/train_sample.csv"),conf);
        DistributedCache.addCacheFile(new URI("cacheData/train_labels.csv"),conf);

        Job job = new Job(conf);
        job.setJarByClass(KNNDriver.class); 
        job.setJobName("KNN");

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setMapperClass(KNNMapper.class);
        job.setReducerClass(KNNReducer.class);
        // job.setInputFormatClass(KeyValueTextInputFormat.class);

        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(IntWritable.class);

        boolean success = job.waitForCompletion(true);
        return success ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new Configuration(), new KNNDriver(), args);
        System.exit(exitCode);
    }
}

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.FileNotFoundException;
import java.util.Scanner;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class KNNMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable> {

  int[][] train_vals;
  int[] train_label_vals;
  int train_rows;
  int test_rows;
  int cols;

  @Override
  public void setup(Context context) throws IOException, InterruptedException {
      Configuration conf = context.getConfiguration();

      // Path[] cacheFiles = context.getLocalCacheFiles();

      int train_rows = conf.getInt("train_rows", 0);
      int test_rows = conf.getInt("test_rows", 0);
      int cols = conf.getInt("cols", 0);

      train_vals = new int[train_rows][cols];
      train_label_vals = new int[train_rows];

      // read train csv, parse, and store into 2d int array
      Scanner myScan;
        try {
            myScan = new Scanner(new File("train_sample.csv"));

            //Set the delimiter used in file
            myScan.useDelimiter("[,\r\n]+");

            //Get all tokens and store them in some data structure
            //I am just printing them

            System.out.println("myScan loaded for train_sample");

            for(int row = 0; row < train_rows; row++) {
                for(int col = 0; col < cols; col++) {
                    train_vals[row][col] = Integer.parseInt(myScan.next().toString());

                }
            }

            myScan.close();

        } catch (FileNotFoundException e) {
            System.out.print("Error: Train file not found.");
        }

    // read train_labels csv, parse, and store into 2d int array
        try {
            myScan = new Scanner(new File("train_labels.csv"));

            //Set the delimiter used in file
            myScan.useDelimiter("[,\r\n]+");

            //Get all tokens and store them in some data structure
            //I am just printing them

            System.out.println("myScan loaded for train_sample");


            for(int row = 0; row < train_rows; row++) {
                    train_label_vals[row] = Integer.parseInt(myScan.next().toString());
            }

            myScan.close();

        } catch (FileNotFoundException e) {
            System.out.print("Error: Train Labels file not found.");
        }
  }

  @Override
  public void map(LongWritable key, Text value, Context context)
      throws IOException, InterruptedException {

        // setup() gave us train_vals & train_label_vals.
        // Each line in map() represents a test observation.  We iterate 
        // through every train_val row to find nearest L2 match, then
        // return a key/value pair of <observation #, 

        // convert from Text to String
        String line = value.toString();
        long distance;
        double best_distance = Double.POSITIVE_INFINITY;
        int col_num;

        int best_digit = -1;
        IntWritable rowId = null;
        int i;
        IntWritable rowNum;
        String[] pixels;

        // comma delimited files, split on commas
        // first we find the # of rows
        for (i = 0; i < train_rows; i++) {
            distance = 0;
            col_num = 0;
            pixels = line.split(",");
            rowId = new IntWritable(Integer.parseInt(pixels[0]));

            for (int j = 1; j < cols; j++) {
                distance += (Integer.parseInt(pixels[j]) - train_vals[i][j-1])^2;
            }
            if (distance < best_distance) {
                best_distance = distance;
                best_digit = train_label_vals[i];
            }
        }
        context.write(rowId, new IntWritable(best_digit));
  }
}

나는 그것이 무엇을 이해하지 못하는, 또는이 매퍼에 파일 데이터를 전송하는 방법 때문에 ... 문을 경로를 주석, 그러나 나는 몇 웹 사이트에 나와났습니다. 현재이 프로그램은 HDFS에 업로드에도 분산 캐시 데이터 세트를 찾을 수 없습니다.

해결법

  1. ==============================

    1.심볼릭 링크를 사용하려고 :

    심볼릭 링크를 사용하려고 :

    DistributedCache.createSymlink(conf);
    DistributedCache.addCacheFile(new URI("cacheData/train_sample.csv#train_sample.csv"),conf);
    DistributedCache.addCacheFile(new URI("cacheData/train_labels.csv#train_labels.csv"),conf);
    

    이것은 당신이 실제로 액세스하려고하는 이름으로 매퍼의 로컬 디렉토리에있는 파일을 사용할 수 있도록합니다.

  2. from https://stackoverflow.com/questions/21247085/problems-with-setting-up-and-accessing-distributed-cache by cc-by-sa and MIT license