복붙노트

[HADOOP] Hadoop 스트리밍 - 감속기 출력에서 ​​후행 탭 제거

HADOOP

Hadoop 스트리밍 - 감속기 출력에서 ​​후행 탭 제거

출력에 키 / 값 쌍이 포함되어 있지 않은 비공개 스트리밍 작업이 있습니다. 가치있는 쌍 또는 키 전용 쌍으로 생각할 수 있습니다.

내 스트리밍 감속기 (PHP 스크립트)는 개행 문자로 구분 된 레코드를 출력합니다. Hadoop 스트리밍은 이것을 값이없는 키로 취급하고 개행 전에 탭을 삽입합니다. 이 여분의 탭은 필요하지 않습니다.

어떻게 제거합니까?

AWS EMR과 함께 hadoop 1.0.3을 사용하고 있습니다. hadoop 1.0.3의 소스를 다운로드하고 hadoop-1.0.3 / src / contrib / streaming / src / java / org / apache / hadoop / streaming / PipeReducer.java에서이 코드를 찾았습니다.

reduceOutFieldSeparator = job_.get("stream.reduce.output.field.separator", "\t").getBytes("UTF-8");

그래서 -D stream.reduce.output.field.separator =를 행운의 인수로 전달하려고했습니다. 나는 또한 -D mapred.textoutputformat.separator =와 -D mapreduce.output.textoutputformat.separator =를 사용해 보았습니다.

나는 물론 구글을 수색했는데 내가 찾은 것이 아무것도 없었다. 한 검색 결과는 심지어 원하는 결과를 얻기 위해 전달할 수있는 인수가 없다고 말했습니다 (단,이 경우 hadoop 버전은 실제로 매우 오래되었습니다).

다음은 내 코드입니다 (가독성을 위해 줄 바꿈이 추가되었습니다).

hadoop jar streaming.jar -files s3n://path/to/a/file.json#file.json
    -D mapred.output.compress=true -D stream.reduce.output.field.separator=
    -input s3n://path/to/some/input/*/* -output hdfs:///path/to/output/dir
    -mapper 'php my_mapper.php' -reducer 'php my_reducer.php'

해결법

  1. ==============================

    1.org.apache.hadoop.mapreduce.lib.output.TextOutputFormat 소스를 보면, 나는 2 가지를 본다.

    org.apache.hadoop.mapreduce.lib.output.TextOutputFormat 소스를 보면, 나는 2 가지를 본다.

    유일한 해결책은이 두 가지 문제를 해결하는 자신 만의 OutputFormat을 작성하는 것입니다.

    필자가 수행 한 작업에서 필자는

    id1|val1|val2|val3
    id1|val1
    

    으로:

    id1|val1,val2,val3
    id2|val1
    

    필자는 라인을 변환하는 커스텀 매퍼 (Perl 스크립트)를 가지고있었습니다. 이 작업을 수행하기 위해 처음에는 키 전용 (또는 값만 입력)으로 수행하려고 시도했지만 후행 탭으로 결과를 얻었습니다.

    처음에는 방금 다음과 같이 지정했습니다.

    이것은 매핑이 키를 원했기 때문에 맵퍼에 키, 값 쌍을 제공했습니다. 하지만이 출력에는 첫 번째 필드 뒤에 탭이 추가되었습니다.

    추가 할 때 원하는 출력을 얻었습니다.

    내가 그것을 설정하지 않았거나 공백으로 설정했다면

    그러면 첫 번째 필드 뒤에 탭이 다시 생깁니다.

    TextOutputFormat의 소스를 살펴보면

  2. ==============================

    2.위의 팁을 사용하여 다른 사람들에게 도움이되는 것처럼 구현을 수행 할 수있었습니다.

    위의 팁을 사용하여 다른 사람들에게 도움이되는 것처럼 구현을 수행 할 수있었습니다.

    CustomOutputFormat<K, V> extends org.apache.hadoop.mapred.TextOutputFormat<K, V> {....}
    

    정확히 'getRecordWriter'의 빌트인 구현의 1 행이 다음과 같이 변경되었습니다.

    String keyValueSeparator = job.get("mapred.textoutputformat.separator", ""); 
    

    대신에:

    String keyValueSeparator = job.get("mapred.textoutputformat.separator", "\t"); 
    

    그것을 Jar로 컴파일하고 그것을 (hadoop 스트리밍에 대한 지침을 통해) 내 hadoop 스트리밍 호출에 포함시킨 후 호출은 다음과 같이 보입니다.

    hadoop   jar  /usr/lib/hadoop/contrib/streaming/hadoop-streaming-1.0.3.jar     \
    -archives 'hdfs:///user/the/path/to/your/jar/onHDFS/theNameOfTheJar.jar' \
    -libjars theNameOfTheJar.jar \
    -outputformat com.yourcompanyHere.package.path.tojavafile.CustomOutputFormat  \
    -file yourMapper.py    -mapper  yourMapper.py     \
    -file yourReducer.py   -reducer yourReducer.py    \
    -input $yourInputFile    \
    -output $yourOutputDirectoryOnHDFS
    

    나는 또한 그 전화를 발급 한 폴더에 병을 넣었다.

    그것은 내 요구에 잘 작동했습니다 (그리고 감속기 후 라인 끝 부분에 탭이 생성되지 않았습니다).

    update : 이것은 다른 사용자에게 도움이되는 주석을 기반으로하며, 여기 내 CustomOutputFormat.java 파일의 전체 소스는 다음과 같습니다.

    import java.io.DataOutputStream;
    import java.io.IOException;
    
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.compress.CompressionCodec;
    import org.apache.hadoop.io.compress.GzipCodec;
    import org.apache.hadoop.mapred.FileOutputFormat;
    import org.apache.hadoop.mapred.JobConf;
    import org.apache.hadoop.mapred.RecordWriter;
    import org.apache.hadoop.mapred.TextOutputFormat;
    import org.apache.hadoop.util.Progressable;
    import org.apache.hadoop.util.ReflectionUtils;
    
    public class CustomOutputFormat<K, V> extends TextOutputFormat<K, V> {
    
        public RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job, String name,
            Progressable progress) throws IOException {
        boolean isCompressed = getCompressOutput(job);
    
        //Channging the default from '\t' to blank
        String keyValueSeparator = job.get("mapred.textoutputformat.separator", ""); // '\t'
        if (!isCompressed) {
            Path file = FileOutputFormat.getTaskOutputPath(job, name);
            FileSystem fs = file.getFileSystem(job);
            FSDataOutputStream fileOut = fs.create(file, progress);
            return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
        } else {
            Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,
                GzipCodec.class);
            // create the named codec
            CompressionCodec codec = ReflectionUtils.newInstance(codecClass, job);
            // build the filename including the extension
            Path file = FileOutputFormat.getTaskOutputPath(job, name + codec.getDefaultExtension());
            FileSystem fs = file.getFileSystem(job);
            FSDataOutputStream fileOut = fs.create(file, progress);
            return new LineRecordWriter<K, V>(new DataOutputStream(
                codec.createOutputStream(fileOut)), keyValueSeparator);
        }
        }
    }
    

    FYI : 사용 환경에 따라 매퍼와 감속기 사이에서 (키와 값을 분리하는 관점에서) 하프 스트리밍 관리 상호 작용에 나쁜 영향을 미치지 않는지 확인하십시오. 명확히하기 :

  3. ==============================

    3.나도이 문제가 있었다. 기본적으로 CSV 데이터 라인을 방출하는 python 맵 전용 작업을 사용하고있었습니다. 결과를 조사한 후에 모든 행의 끝에 \ t를 적어 두었습니다.

    나도이 문제가 있었다. 기본적으로 CSV 데이터 라인을 방출하는 python 맵 전용 작업을 사용하고있었습니다. 결과를 조사한 후에 모든 행의 끝에 \ t를 적어 두었습니다.

     foo,bar,baz\t
    

    필자가 발견 한 것은 매퍼 (mapper)와 파이썬 스트림 (Python stream) 모두 키 값 쌍을 다루는 것입니다. 기본 구분 기호를 내 보내지 않으면 CSV 데이터의 전체 줄이 "키"로 간주되고 키와 값이 필요한 프레임 워크는 \ t 및 빈 값을 찾습니다.

    내 데이터는 기본적으로 CSV 문자열이므로 스트림 및 매핑 된 출력에 대한 구분 기호를 쉼표로 설정했습니다. 프레임 워크는 첫 번째 쉼표를 키로, 첫 번째 쉼표를 값으로 사용하여 모든 것을 읽습니다. 그런 다음 파일에 결과를 썼을 때 필자는 필자가 쓴 결과를 효과적으로 생성하는 핵심 쉼표 값을 썼다.

     foo,bar,baz
    

    내 경우, 프레임 워크가 내 CSV 출력 끝 부분에 \ t를 추가하지 못하도록 아래에 추가했습니다 ...

    -D mapred.reduce.tasks=0 \
    -D stream.map.output.field.separator=, \
    -D mapred.textoutputformat.separator=, \
    
  4. from https://stackoverflow.com/questions/18133290/hadoop-streaming-remove-trailing-tab-from-reducer-output by cc-by-sa and MIT license