복붙노트

[HADOOP] java.lang.IllegalArgumentException : 잘못된 FS :, 예상 됨 : hdfs : // localhost : 9000

HADOOP

java.lang.IllegalArgumentException : 잘못된 FS :, 예상 됨 : hdfs : // localhost : 9000

줄이는 쪽 조인을 구현하려고하는데 mapfile 판독기를 사용하여 분산 캐시를 찾지 만 stderr에서 다음 오류가 표시되면 lookupfile 파일이 이미 hdfs에 있고 올바르게로드 된 것처럼 보입니다. stdout에 표시된대로 캐시에 저장하십시오.

이것은 내 운전 코드입니다.

package mr_poc;

import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class driverrsj extends Configured implements Tool{

    @Override
    public int run(String[] arg) throws Exception {
if(arg.length != 3)
{
    System.out.printf("3 parameters are required for DriverRSJ- <Input Dir1> <Input Dir2> <Output Dir> \n");
    return -1;
}
Job job = new Job(getConf());
Configuration conf = job.getConfiguration();
DistributedCache.addCacheFile(new URI("/input/delivery_status"), conf);
System.out.println("Cache : " + job.getConfiguration().get("mapred.cache.files"));
job.setJarByClass(driverrsj.class);
conf.setInt("cust_info", 1);
conf.setInt("status", 2);
StringBuilder inputPaths = new StringBuilder();
inputPaths.append(arg[0].toString()).append(",").append(arg[1].toString());
FileInputFormat.setInputPaths(job, inputPaths.toString());
FileOutputFormat.setOutputPath(job, new Path(arg[2]));
job.setJarByClass(driverrsj.class);
job.setMapperClass(mappperRSJ.class);
job.setReducerClass(reducerrsj.class);
job.setMapOutputKeyClass(CompositeKeyWritableRSJ.class);
job.setMapOutputValueClass(Text.class);
//job.setPartitionerClass(partinonrsj.class);
job.setSortComparatorClass(secondarysortcomp.class);
job.setGroupingComparatorClass(GroupingComparatorRSJ.class);
job.setNumReduceTasks(1);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);


boolean success =job.waitForCompletion(true);
return success? 0 : 1;

    }

    public static void main(String[] args) throws Exception{
        int exitCode = ToolRunner.run(new Configuration(), new driverrsj(),args);
        System.exit(exitCode);

    }


}

내 감속기 코드예요.

패키지 mr_poc;

import java.io.File;
import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class reducerrsj extends Reducer<CompositeKeyWritableRSJ, Text, NullWritable, Text>{
    StringBuilder reduceValueBuilder = new StringBuilder("");
    NullWritable nullWritableKey = NullWritable.get();
    Text reduceOutputValue = new Text("");
    String strSeparator = ",";
    private MapFile.Reader deptMapReader = null;
    Text txtMapFileLookupKey = new Text();
    Text txtMapFileLookupValue = new Text();
    //Path[] cacheFilesLocal;
    //Path[] eachPath;

    @Override
    protected void setup(Context context) throws IOException,InterruptedException {
        Path[] cacheFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration());



        for ( Path eachPath : cacheFiles){

            System.out.println(eachPath.toString());
              System.out.println(eachPath.getName());
            if(eachPath.getName().toString().contains("delivery_status"))
            {

                URI uriUncompressedFile = new File(eachPath.toString()+ "/DeliveryStatusCodes").toURI();
                initializeDepartmentsMap(uriUncompressedFile, context);

            }
            }
        }

    //@SuppressWarnings("deprecation")
    private void initializeDepartmentsMap(URI uriUncompressedFile, Context context)
    throws IOException {
    // {{
    // Initialize the reader of the map file (side data)
        Configuration conf = context.getConfiguration();
        conf.addResource(new Path("/usr/local/hadoop-1.2.1/conf/core-site.xml"));
        FileSystem dfs = FileSystem.get(conf);
    try {


    deptMapReader = new MapFile.Reader(dfs,uriUncompressedFile.toString(), context.getConfiguration());
    } catch (Exception e) {
    e.printStackTrace();
    }
    // }}
    }
    private StringBuilder buildOutputValue(CompositeKeyWritableRSJ key,
            StringBuilder reduceValueBuilder, Text value) {

            if (key.getsourceindex() == 2) {


            String arrSalAttributes[] = value.toString().split(",");
            txtMapFileLookupKey.set(arrSalAttributes[0].toString());
            System.out.println("key=" + txtMapFileLookupKey);


            try {

            deptMapReader.get(txtMapFileLookupKey, txtMapFileLookupValue);
            }
             catch (Exception e) {
            txtMapFileLookupValue.set("");
                e.printStackTrace();
            } finally {
            txtMapFileLookupValue
            .set((txtMapFileLookupValue.equals(null) || txtMapFileLookupValue
            .equals("")) ? "NOT-FOUND"
            : txtMapFileLookupValue.toString());
            }

            reduceValueBuilder.append(txtMapFileLookupValue.toString());


            } else if(key.getsourceindex() == 1) {

            String arrEmpAttributes[] = value.toString().split(",");
            reduceValueBuilder.append(arrEmpAttributes[0].toString()).append(
            strSeparator);
            } 



            txtMapFileLookupKey.set("");
            txtMapFileLookupValue.set("");

            return reduceValueBuilder;
    }

    @Override
    public void reduce(CompositeKeyWritableRSJ key, Iterable<Text> values,
    Context context) throws IOException, InterruptedException {


    for (Text value : values) {
    buildOutputValue(key, reduceValueBuilder, value);
    }

    // Drop last comma, set value, and emit output
    if (reduceValueBuilder.length() > 1) {

    //reduceValueBuilder.setLength(reduceValueBuilder.length() - 1);
    // Emit output
    reduceOutputValue.set(reduceValueBuilder.toString());
    context.write(nullWritableKey, reduceOutputValue);
    } else {
    System.out.println("Key=" + key.getjoinkey() + "src="
    + key.getsourceindex());

    }
    // Reset variables
    reduceValueBuilder.setLength(0);
    reduceOutputValue.set("");

    }
    @Override
    protected void cleanup(Context context) throws IOException,
    InterruptedException {
         if(deptMapReader != null)
         {
deptMapReader.close();
    }
    }
}

이것은 내 핵심 사이트 Xml입니다.

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<!-- Put site-specific property overrides in this file. -->

<configuration>
<property>
  <name>hadoop.tmp.dir</name>
  <value>/app/hadoop/tmp</value>
  <description>A base for other temporary directories.</description>
</property>
<property>
  <name>fs.default.name</name>
  <value>hdfs://localhost:9000</value>
  <description>The name of the default file system.  A URI whose
  scheme and authority determine the FileSystem implementation.  The
  uri's scheme determines the config property (fs.SCHEME.impl) naming
  the FileSystem implementation class.  The uri's authority is used to
  determine the host, port, etc. for a filesystem.</description>
</property>
</configuration>

어떤 도움을 주시면 감사하겠습니다. 미리 감사드립니다 !!!

해결법

  1. ==============================

    1.나는 같은 문제가 있었는데, 나는 그것을

    나는 같은 문제가 있었는데, 나는 그것을

    FileSystem fs = FileSystem.get(new URI("hdfs://localhost:9000"),conf)
    

    드라이버 클래스에서.

    java.net.URI에서 URI를 가져와야합니다.

  2. ==============================

    2.나는 비슷한 문제가 발생했다고 생각한다. 이 문제점의 핵심은 로컬 파일 시스템에 있어야하는 DistributedCache에서 SequenceFile을 조작하려고한다는 것입니다. 로그에서 한 줄이 있습니다.

    나는 비슷한 문제가 발생했다고 생각한다. 이 문제점의 핵심은 로컬 파일 시스템에 있어야하는 DistributedCache에서 SequenceFile을 조작하려고한다는 것입니다. 로그에서 한 줄이 있습니다.

    "org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:140)" 
    

    SequenceFile.Reader의 소스 코드를 확인하면이 코드로 인해 로그가 생성되었음을 알 수 있습니다

    fs.getFileStatus(filename).getLen()    
    

    여기서 "fs"는 DistributedFileSystem 대신 LocalFileSystem이어야합니다.

    내 솔루션은 그 변화입니다.

    deptMapReader = new MapFile.Reader(dfs,uriUncompressedFile.toString(), context.getConfiguration());
    

    JobConf conf = context.getConfiguration();
    String originalFS = conf.get("fs.default.name");   //backup original configuration
    conf.set("fs.default.name", "file:///");           //change configuration to local file system
    deptMapReader = new MapFile.Reader(dfs,uriUncompressedFile.toString(), conf);
    conf.set("fs.default.name", originalFS);           //restore original configuration
    

    이렇게하면 SequenceFile.Reader 객체는 로컬 파일 시스템의 캐시 된 파일에 액세스 할 수 있습니다.

    SequenceFile API가 변경 되었기 때문에이 문제가 발생한다고 생각합니다.이 경우 MapFile.Reader (fs, path, conf)와 같은 SequenceFile.Reader의 일부 API는 사용되지 않습니다.

    이 솔루션은 나를 위해 잘 작동합니다.

  3. ==============================

    3.core-site.xml 파일에 따라 conf의 속성을 다음과 같이 설정해야합니다.

    core-site.xml 파일에 따라 conf의 속성을 다음과 같이 설정해야합니다.

    conf.set("fs.defaultFS", "hdfs://host:port");
    conf.set("mapreduce.jobtracker.address", "host:port");
    
  4. ==============================

    4.직업 주자에 아래 라인을 포함 :  DistributedCache.addCacheFile (새 URI ( ""), conf);

    직업 주자에 아래 라인을 포함 :  DistributedCache.addCacheFile (새 URI ( ""), conf);

    mapper의 설정 방법에있는 코드 아래

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        Configuration configuration = new Configuration();
        FileSystem fileSystem = null;
        try {
             fileSystem = FileSystem.get(new URI("<File location"),configuration);
        } catch (URISyntaxException e) {
            e.printStackTrace();
        }
    
        String location = <S3 file location>;
        FSDataInputStream fsDataInputStream =fileSystem.open(new Path(location));
        Scanner scanner = new Scanner(fsDataInputStream);
        int i = 1;
        while(scanner.hasNextLine()) {
            String str[] = scanner.nextLine().split(",");
            LOG.info("keys are \t" + str[0] + str[1]);
            stickerMap.put(str[0] + str[1], i);
            ++i;
        }
    }
    
  5. from https://stackoverflow.com/questions/23779976/java-lang-illegalargumentexception-wrong-fs-expected-hdfs-localhost9000 by cc-by-sa and MIT license