[HADOOP] Hadoop 스트리밍 - 감속기 출력에서 후행 탭 제거
HADOOPHadoop 스트리밍 - 감속기 출력에서 후행 탭 제거
출력에 키 / 값 쌍이 포함되어 있지 않은 비공개 스트리밍 작업이 있습니다. 가치있는 쌍 또는 키 전용 쌍으로 생각할 수 있습니다.
내 스트리밍 감속기 (PHP 스크립트)는 개행 문자로 구분 된 레코드를 출력합니다. Hadoop 스트리밍은 이것을 값이없는 키로 취급하고 개행 전에 탭을 삽입합니다. 이 여분의 탭은 필요하지 않습니다.
어떻게 제거합니까?
AWS EMR과 함께 hadoop 1.0.3을 사용하고 있습니다. hadoop 1.0.3의 소스를 다운로드하고 hadoop-1.0.3 / src / contrib / streaming / src / java / org / apache / hadoop / streaming / PipeReducer.java에서이 코드를 찾았습니다.
reduceOutFieldSeparator = job_.get("stream.reduce.output.field.separator", "\t").getBytes("UTF-8");
그래서 -D stream.reduce.output.field.separator =를 행운의 인수로 전달하려고했습니다. 나는 또한 -D mapred.textoutputformat.separator =와 -D mapreduce.output.textoutputformat.separator =를 사용해 보았습니다.
나는 물론 구글을 수색했는데 내가 찾은 것이 아무것도 없었다. 한 검색 결과는 심지어 원하는 결과를 얻기 위해 전달할 수있는 인수가 없다고 말했습니다 (단,이 경우 hadoop 버전은 실제로 매우 오래되었습니다).
다음은 내 코드입니다 (가독성을 위해 줄 바꿈이 추가되었습니다).
hadoop jar streaming.jar -files s3n://path/to/a/file.json#file.json
-D mapred.output.compress=true -D stream.reduce.output.field.separator=
-input s3n://path/to/some/input/*/* -output hdfs:///path/to/output/dir
-mapper 'php my_mapper.php' -reducer 'php my_reducer.php'
해결법
-
==============================
1.org.apache.hadoop.mapreduce.lib.output.TextOutputFormat 소스를 보면, 나는 2 가지를 본다.
org.apache.hadoop.mapreduce.lib.output.TextOutputFormat 소스를 보면, 나는 2 가지를 본다.
유일한 해결책은이 두 가지 문제를 해결하는 자신 만의 OutputFormat을 작성하는 것입니다.
필자가 수행 한 작업에서 필자는
id1|val1|val2|val3 id1|val1
으로:
id1|val1,val2,val3 id2|val1
필자는 라인을 변환하는 커스텀 매퍼 (Perl 스크립트)를 가지고있었습니다. 이 작업을 수행하기 위해 처음에는 키 전용 (또는 값만 입력)으로 수행하려고 시도했지만 후행 탭으로 결과를 얻었습니다.
처음에는 방금 다음과 같이 지정했습니다.
이것은 매핑이 키를 원했기 때문에 맵퍼에 키, 값 쌍을 제공했습니다. 하지만이 출력에는 첫 번째 필드 뒤에 탭이 추가되었습니다.
추가 할 때 원하는 출력을 얻었습니다.
내가 그것을 설정하지 않았거나 공백으로 설정했다면
그러면 첫 번째 필드 뒤에 탭이 다시 생깁니다.
TextOutputFormat의 소스를 살펴보면
-
==============================
2.위의 팁을 사용하여 다른 사람들에게 도움이되는 것처럼 구현을 수행 할 수있었습니다.
위의 팁을 사용하여 다른 사람들에게 도움이되는 것처럼 구현을 수행 할 수있었습니다.
CustomOutputFormat<K, V> extends org.apache.hadoop.mapred.TextOutputFormat<K, V> {....}
정확히 'getRecordWriter'의 빌트인 구현의 1 행이 다음과 같이 변경되었습니다.
String keyValueSeparator = job.get("mapred.textoutputformat.separator", "");
대신에:
String keyValueSeparator = job.get("mapred.textoutputformat.separator", "\t");
그것을 Jar로 컴파일하고 그것을 (hadoop 스트리밍에 대한 지침을 통해) 내 hadoop 스트리밍 호출에 포함시킨 후 호출은 다음과 같이 보입니다.
hadoop jar /usr/lib/hadoop/contrib/streaming/hadoop-streaming-1.0.3.jar \ -archives 'hdfs:///user/the/path/to/your/jar/onHDFS/theNameOfTheJar.jar' \ -libjars theNameOfTheJar.jar \ -outputformat com.yourcompanyHere.package.path.tojavafile.CustomOutputFormat \ -file yourMapper.py -mapper yourMapper.py \ -file yourReducer.py -reducer yourReducer.py \ -input $yourInputFile \ -output $yourOutputDirectoryOnHDFS
나는 또한 그 전화를 발급 한 폴더에 병을 넣었다.
그것은 내 요구에 잘 작동했습니다 (그리고 감속기 후 라인 끝 부분에 탭이 생성되지 않았습니다).
update : 이것은 다른 사용자에게 도움이되는 주석을 기반으로하며, 여기 내 CustomOutputFormat.java 파일의 전체 소스는 다음과 같습니다.
import java.io.DataOutputStream; import java.io.IOException; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordWriter; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.ReflectionUtils; public class CustomOutputFormat<K, V> extends TextOutputFormat<K, V> { public RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException { boolean isCompressed = getCompressOutput(job); //Channging the default from '\t' to blank String keyValueSeparator = job.get("mapred.textoutputformat.separator", ""); // '\t' if (!isCompressed) { Path file = FileOutputFormat.getTaskOutputPath(job, name); FileSystem fs = file.getFileSystem(job); FSDataOutputStream fileOut = fs.create(file, progress); return new LineRecordWriter<K, V>(fileOut, keyValueSeparator); } else { Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class); // create the named codec CompressionCodec codec = ReflectionUtils.newInstance(codecClass, job); // build the filename including the extension Path file = FileOutputFormat.getTaskOutputPath(job, name + codec.getDefaultExtension()); FileSystem fs = file.getFileSystem(job); FSDataOutputStream fileOut = fs.create(file, progress); return new LineRecordWriter<K, V>(new DataOutputStream( codec.createOutputStream(fileOut)), keyValueSeparator); } } }
FYI : 사용 환경에 따라 매퍼와 감속기 사이에서 (키와 값을 분리하는 관점에서) 하프 스트리밍 관리 상호 작용에 나쁜 영향을 미치지 않는지 확인하십시오. 명확히하기 :
-
==============================
3.나도이 문제가 있었다. 기본적으로 CSV 데이터 라인을 방출하는 python 맵 전용 작업을 사용하고있었습니다. 결과를 조사한 후에 모든 행의 끝에 \ t를 적어 두었습니다.
나도이 문제가 있었다. 기본적으로 CSV 데이터 라인을 방출하는 python 맵 전용 작업을 사용하고있었습니다. 결과를 조사한 후에 모든 행의 끝에 \ t를 적어 두었습니다.
foo,bar,baz\t
필자가 발견 한 것은 매퍼 (mapper)와 파이썬 스트림 (Python stream) 모두 키 값 쌍을 다루는 것입니다. 기본 구분 기호를 내 보내지 않으면 CSV 데이터의 전체 줄이 "키"로 간주되고 키와 값이 필요한 프레임 워크는 \ t 및 빈 값을 찾습니다.
내 데이터는 기본적으로 CSV 문자열이므로 스트림 및 매핑 된 출력에 대한 구분 기호를 쉼표로 설정했습니다. 프레임 워크는 첫 번째 쉼표를 키로, 첫 번째 쉼표를 값으로 사용하여 모든 것을 읽습니다. 그런 다음 파일에 결과를 썼을 때 필자는 필자가 쓴 결과를 효과적으로 생성하는 핵심 쉼표 값을 썼다.
foo,bar,baz
내 경우, 프레임 워크가 내 CSV 출력 끝 부분에 \ t를 추가하지 못하도록 아래에 추가했습니다 ...
-D mapred.reduce.tasks=0 \ -D stream.map.output.field.separator=, \ -D mapred.textoutputformat.separator=, \
from https://stackoverflow.com/questions/18133290/hadoop-streaming-remove-trailing-tab-from-reducer-output by cc-by-sa and MIT license
'HADOOP' 카테고리의 다른 글
[HADOOP] hdfs (namenode)에서 사용 된 네임 스페이스 및 메타 데이터의 의미는 무엇입니까? (0) | 2019.06.17 |
---|---|
[HADOOP] Amazon EC2 / S3를 사용하여 Hadoop 클러스터에서 HDFS로 로컬 데이터를 복사 할 때 발생하는 문제 (0) | 2019.06.17 |
[HADOOP] TaskTracker 하위 프로세스에 메모리를 구성하는 다양한 방법 (Mapper 및 Reduce Tasks) (0) | 2019.06.17 |
[HADOOP] 잘못된 구성 : namenode 주소 dfs.namenode.rpc-address가 구성되지 않았습니다. (0) | 2019.06.16 |
[HADOOP] Spark에서 S3 (프랑크푸르트) 사용하기 (0) | 2019.06.16 |