복붙노트

[HADOOP] Hadoop DistributedCache는 더 이상 사용되지 않습니다 - 선호하는 API는 무엇입니까?

HADOOP

Hadoop DistributedCache는 더 이상 사용되지 않습니다 - 선호하는 API는 무엇입니까?

내지도 작업에는 분산 캐시를 통해 배포하려는 일부 구성 데이터가 필요합니다.

Hadoop MapReduce 튜토리얼은 대략 다음과 같이 DistributedCache 클래스의 사용법을 보여줍니다.

// In the driver
JobConf conf = new JobConf(getConf(), WordCount.class);
...
DistributedCache.addCacheFile(new Path(filename).toUri(), conf); 

// In the mapper
Path[] myCacheFiles = DistributedCache.getLocalCacheFiles(job);
...

그러나 DistributedCache는 Hadoop 2.2.0에서 사용되지 않음으로 표시됩니다.

이것을 달성하기위한 새로운 선호 방식은 무엇입니까? 이 API를 다루는 최신 예제 또는 자습서가 있습니까?

해결법

  1. ==============================

    1.분산 캐시 용 API는 Job 클래스 자체에서 찾을 수 있습니다. http://hadoop.apache.org/docs/stable2/api/org/apache/hadoop/mapreduce/Job.html에서 설명서를 확인하십시오.  코드는 다음과 같아야합니다.

    분산 캐시 용 API는 Job 클래스 자체에서 찾을 수 있습니다. http://hadoop.apache.org/docs/stable2/api/org/apache/hadoop/mapreduce/Job.html에서 설명서를 확인하십시오.  코드는 다음과 같아야합니다.

    Job job = new Job();
    ...
    job.addCacheFile(new Path(filename).toUri());
    

    당신의 매퍼 코드에서 :

    Path[] localPaths = context.getLocalCacheFiles();
    ...
    
  2. ==============================

    2.@jtravaglini를 확장하기 위해 YARN / MapReduce 2에 DistributedCache를 사용하는 가장 좋은 방법은 다음과 같습니다.

    @jtravaglini를 확장하기 위해 YARN / MapReduce 2에 DistributedCache를 사용하는 가장 좋은 방법은 다음과 같습니다.

    드라이버에서 Job.addCacheFile ()

    public int run(String[] args) throws Exception {
        Configuration conf = getConf();
    
        Job job = Job.getInstance(conf, "MyJob");
    
        job.setMapperClass(MyMapper.class);
    
        // ...
    
        // Mind the # sign after the absolute file location.
        // You will be using the name after the # sign as your
        // file name in your Mapper/Reducer
        job.addCacheFile(new URI("/user/yourname/cache/some_file.json#some"));
        job.addCacheFile(new URI("/user/yourname/cache/other_file.json#other"));
    
        return job.waitForCompletion(true) ? 0 : 1;
    }
    

    그리고 매퍼 / 감속기에서 설치 (컨텍스트 컨텍스트) 방법을 재정의하십시오.

    @Override
    protected void setup(
            Mapper<LongWritable, Text, Text, Text>.Context context)
            throws IOException, InterruptedException {
        if (context.getCacheFiles() != null
                && context.getCacheFiles().length > 0) {
    
            File some_file = new File("./some");
            File other_file = new File("./other");
    
            // Do things to these two files, like read them
            // or parse as JSON or whatever.
        }
        super.setup(context);
    }
    
  3. ==============================

    3.YARN / MR2 용 새로운 DistributedCache API는 org.apache.hadoop.mapreduce.Job 클래스에 있습니다.

    YARN / MR2 용 새로운 DistributedCache API는 org.apache.hadoop.mapreduce.Job 클래스에 있습니다.

       Job.addCacheFile()
    

    안타깝게도이 튜토리얼 스타일의 예제는 아직 많이 없습니다.

    http://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapreduce/Job.html#addCacheFile%28java.net.URI%29

  4. ==============================

    4.나는 job.addCacheFile ()을 사용하지 않았다. 대신 이전처럼 "-files /path/to/myfile.txt#myfile"과 같은 -files 옵션을 사용했습니다. 그런 다음 매퍼 또는 감속기 코드에서 아래 방법을 사용합니다.

    나는 job.addCacheFile ()을 사용하지 않았다. 대신 이전처럼 "-files /path/to/myfile.txt#myfile"과 같은 -files 옵션을 사용했습니다. 그런 다음 매퍼 또는 감속기 코드에서 아래 방법을 사용합니다.

    /**
     * This method can be used with local execution or HDFS execution. 
     * 
     * @param context
     * @param symLink
     * @param throwExceptionIfNotFound
     * @return
     * @throws IOException
     */
    public static File findDistributedFileBySymlink(JobContext context, String symLink, boolean throwExceptionIfNotFound) throws IOException
    {
        URI[] uris = context.getCacheFiles();
        if(uris==null||uris.length==0)
        {
            if(throwExceptionIfNotFound)
                throw new RuntimeException("Unable to find file with symlink '"+symLink+"' in distributed cache");
            return null;
        }
        URI symlinkUri = null;
        for(URI uri: uris)
        {
            if(symLink.equals(uri.getFragment()))
            {
                symlinkUri = uri;
                break;
            }
        }   
        if(symlinkUri==null)
        {
            if(throwExceptionIfNotFound)
                throw new RuntimeException("Unable to find file with symlink '"+symLink+"' in distributed cache");
            return null;
        }
        //if we run this locally the file system URI scheme will be "file" otherwise it should be a symlink
        return "file".equalsIgnoreCase(FileSystem.get(context.getConfiguration()).getScheme())?(new File(symlinkUri.getPath())):new File(symLink);
    
    }
    

    그런 다음 매퍼 / 감속기에서 :

    @Override
    protected void setup(Context context) throws IOException, InterruptedException
    {
        super.setup(context);
    
        File file = HadoopUtils.findDistributedFileBySymlink(context,"myfile",true);
        ... do work ...
    }
    

    "-files /path/to/myfile.txt"를 직접 사용한 경우 기본 symlink 이름이므로 "myfile.txt"를 사용하여 파일에 액세스해야합니다.

  5. ==============================

    5.언급 된 솔루션 중 어느 것도 완벽하게 작동하지 않았습니다. Hadoop 버전이 계속 변경되기 때문에 hadoop 2.6.4를 사용하고 있습니다. 기본적으로 DistributedCache는 사용되지 않으므로 사용하지 않으려 고합니다. 일부 게시물은 addCacheFile ()을 사용하도록 제안했기 때문에 약간 변경되었습니다. 그것이 나를 위해 일한 방법은 다음과 같습니다.

    언급 된 솔루션 중 어느 것도 완벽하게 작동하지 않았습니다. Hadoop 버전이 계속 변경되기 때문에 hadoop 2.6.4를 사용하고 있습니다. 기본적으로 DistributedCache는 사용되지 않으므로 사용하지 않으려 고합니다. 일부 게시물은 addCacheFile ()을 사용하도록 제안했기 때문에 약간 변경되었습니다. 그것이 나를 위해 일한 방법은 다음과 같습니다.

    job.addCacheFile(new URI("hdfs://X.X.X.X:9000/EnglishStop.txt#EnglishStop.txt"));
    

    여기서 X.X.X.X는 마스터 IP 주소 또는 로컬 호스트 일 수 있습니다. EnglishStop.txt는 / location에 HDFS에 저장되었습니다.

    hadoop fs -ls /
    

    출력은 다음과 같습니다.

    -rw-r--r--   3 centos supergroup       1833 2016-03-12 20:24 /EnglishStop.txt
    drwxr-xr-x   - centos supergroup          0 2016-03-12 19:46 /test
    

    재미 있고 편리하지만 # EnglishStop.txt는 이제 매퍼에서 "EnglishStop.txt"로 액세스 할 수 있음을 의미합니다. 여기에 같은 코드가있다.

    public void setup(Context context) throws IOException, InterruptedException     
    {
        File stopwordFile = new File("EnglishStop.txt");
        FileInputStream fis = new FileInputStream(stopwordFile);
        BufferedReader reader = new BufferedReader(new InputStreamReader(fis));
    
        while ((stopWord = reader.readLine()) != null) {
            // stopWord is a word read from Cache
        }
    }
    

    이것은 나를 위해 일했습니다. HDFS에 저장된 파일에서 라인을 읽을 수 있습니다.

  6. ==============================

    6.나는 똑같은 문제가 있었다. 또한 DistributedCach가 더 이상 사용되지 않을뿐만 아니라 getLocalCacheFiles 및 "새 작업"도 제공됩니다. 그래서 나를 위해 일한 것은 다음과 같습니다 :

    나는 똑같은 문제가 있었다. 또한 DistributedCach가 더 이상 사용되지 않을뿐만 아니라 getLocalCacheFiles 및 "새 작업"도 제공됩니다. 그래서 나를 위해 일한 것은 다음과 같습니다 :

    운전사:

    Configuration conf = getConf();
    Job job = Job.getInstance(conf);
    ...
    job.addCacheFile(new Path(filename).toUri());
    

    매퍼 / 감속기 설치시 :

    @Override
    protected void setup(Context context) throws IOException, InterruptedException
    {
        super.setup(context);
    
        URI[] files = context.getCacheFiles(); // getCacheFiles returns null
    
        Path file1path = new Path(files[0])
        ...
    }
    
  7. from https://stackoverflow.com/questions/21239722/hadoop-distributedcache-is-deprecated-what-is-the-preferred-api by cc-by-sa and MIT license