[HADOOP] 하나의지도에 여러 줄의 텍스트 제공
HADOOP하나의지도에 여러 줄의 텍스트 제공
저는 Hadoop을 사용하여 N 개의 라인을 단일 매핑에 보내려고했습니다. 나는 이미 줄이 쪼개 질 필요가 없다.
나는 NLineInputFormat을 사용하려했지만 데이터에서 N 행의 텍스트를 한 번에 한 행씩 각 매퍼로 보냅니다.
나는 옵션을 설정하려고 시도했다. 그리고 N 라인 만 입력하면 한 번에 한 라인 씩 각 맵에 그것을 보낸다.
job.setInt("mapred.line.input.format.linespermap", 10);
LineRecordReader :: next를 오버라이드 할 것을 권하는 메일 링리스트를 발견했는데, 그렇게 간단하지는 않습니다. 내부 데이터 멤버가 모두 private이기 때문입니다.
난 그냥 NLineInputFormat에 대한 소스를 확인하고 그것을 하드 코드 LineReader, 그래서 우선하지 도움이되지 않습니다.
또한 Amazon EC2 MapReduce와의 호환성을 위해 Hadoop 0.18을 사용하고 있습니다.
해결법
-
==============================
1.자신 만의 입력 형식을 구현해야합니다. 당신은 또한 자신의 레코드 리더를 정의 할 수 있습니다.
자신 만의 입력 형식을 구현해야합니다. 당신은 또한 자신의 레코드 리더를 정의 할 수 있습니다.
불행히도 getSplits () - 메서드를 정의해야합니다. 내 의견으로는 이것은 레코드 판독기를 구현하는 것보다 어려울 것이다.이 방법은 입력 데이터를 청크하는 로직을 구현해야한다.
"Hadoop - 최종 가이드"(내가 항상 추천할만한 훌륭한 책)에서 발췌 한 내용을 참조하십시오.
다음은 인터페이스입니다.
public interface InputFormat<K, V> { InputSplit[] getSplits(JobConf job, int numSplits) throws IOException; RecordReader<K, V> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException; }
JobClient는 getSplits () 메서드를 호출하여 원하는 수의 맵 작업을 전달합니다. numSplits 인수로 InputFormat이 구현 한 것처럼이 숫자는 힌트로 취급됩니다. 멘션은 다른 수의 스플릿을 원하는 수만큼 반환 할 수 있습니다. numSplits. 분할을 계산하면 클라이언트는이를 구직자에게 보냅니다. 저장 위치를 사용하여 맵 작업을 예약하여 작업 추적자에서 처리합니다.
태스크 트래커에서 맵 태스크는 split을 getRecordReader () 메소드로 전달합니다. InputFormat을 사용하여 해당 분할에 대한 RecordReader를 가져옵니다. RecordReader는 레코드에 대한 반복자이며 맵 태스크는 레코드 키 - 값 쌍을 생성하기 위해 하나를 사용하며, 맵 기능으로 전달됩니다. 코드 스 니펫 (MapRunner의 코드를 기반으로 함) 아이디어를 설명합니다.
K key = reader.createKey(); V value = reader.createValue(); while (reader.next(key, value)) { mapper.map(key, value, output, reporter); }
-
==============================
2.최근 NLineInputFormat을 재정의하고 기본 LineReader 대신 사용자 정의 MultiLineRecordReader를 구현하는 자체 InputFormat을 작성하여이 문제를 해결했습니다.
최근 NLineInputFormat을 재정의하고 기본 LineReader 대신 사용자 정의 MultiLineRecordReader를 구현하는 자체 InputFormat을 작성하여이 문제를 해결했습니다.
NLineInputFormat을 확장하기로 결정한 이유는 분할 당 정확히 N 줄을 사용한다는 동일한 보장이 있기 때문입니다.
이 레코드 리더는 http://bigdatacircus.com/2012/08/01/wordcount-with-custom-record-reader-of-textinputformat/에서 가져온 것입니다.
내가 수정 한 유일한 사항은 maxLineLength 속성이 이제는 새로운 API를 사용하고, NLINESTOPROCESS 값은 NLineInputFormat의 setNumLinesPerSplit ()에서 하드 코딩 된 것입니다 (더 많은 유연성을 위해).
결과는 다음과 같습니다.
public class MultiLineInputFormat extends NLineInputFormat{ @Override public RecordReader<LongWritable, Text> createRecordReader(InputSplit genericSplit, TaskAttemptContext context) { context.setStatus(genericSplit.toString()); return new MultiLineRecordReader(); } public static class MultiLineRecordReader extends RecordReader<LongWritable, Text>{ private int NLINESTOPROCESS; private LineReader in; private LongWritable key; private Text value = new Text(); private long start =0; private long end =0; private long pos =0; private int maxLineLength; @Override public void close() throws IOException { if (in != null) { in.close(); } } @Override public LongWritable getCurrentKey() throws IOException,InterruptedException { return key; } @Override public Text getCurrentValue() throws IOException, InterruptedException { return value; } @Override public float getProgress() throws IOException, InterruptedException { if (start == end) { return 0.0f; } else { return Math.min(1.0f, (pos - start) / (float)(end - start)); } } @Override public void initialize(InputSplit genericSplit, TaskAttemptContext context)throws IOException, InterruptedException { NLINESTOPROCESS = getNumLinesPerSplit(context); FileSplit split = (FileSplit) genericSplit; final Path file = split.getPath(); Configuration conf = context.getConfiguration(); this.maxLineLength = conf.getInt("mapreduce.input.linerecordreader.line.maxlength",Integer.MAX_VALUE); FileSystem fs = file.getFileSystem(conf); start = split.getStart(); end= start + split.getLength(); boolean skipFirstLine = false; FSDataInputStream filein = fs.open(split.getPath()); if (start != 0){ skipFirstLine = true; --start; filein.seek(start); } in = new LineReader(filein,conf); if(skipFirstLine){ start += in.readLine(new Text(),0,(int)Math.min((long)Integer.MAX_VALUE, end - start)); } this.pos = start; } @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (key == null) { key = new LongWritable(); } key.set(pos); if (value == null) { value = new Text(); } value.clear(); final Text endline = new Text("\n"); int newSize = 0; for(int i=0;i<NLINESTOPROCESS;i++){ Text v = new Text(); while (pos < end) { newSize = in.readLine(v, maxLineLength,Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),maxLineLength)); value.append(v.getBytes(),0, v.getLength()); value.append(endline.getBytes(),0, endline.getLength()); if (newSize == 0) { break; } pos += newSize; if (newSize < maxLineLength) { break; } } } if (newSize == 0) { key = null; value = null; return false; } else { return true; } } } }
-
==============================
3.귀하의 경우 위임 패턴을 따르고 LineRecordReader를 래퍼 (wrapper)로 구현하여 하나의 라인이 아닌 N 개의 라인을 연결하는 값을 설정하는 데 필요한 next () (또는 nextKeyValue ()) 메소드를 재정의 할 수 있다고 생각합니다.
귀하의 경우 위임 패턴을 따르고 LineRecordReader를 래퍼 (wrapper)로 구현하여 하나의 라인이 아닌 N 개의 라인을 연결하는 값을 설정하는 데 필요한 next () (또는 nextKeyValue ()) 메소드를 재정의 할 수 있다고 생각합니다.
필자는 ParagraphRecordReader의 모범적 인 구현을 시도하여 EOF 또는 빈 줄을 만나기 전까지 입력 데이터를 한 줄씩 읽고 연결하는 LineRecordReader를 사용합니다. 그런 다음 값을 한 줄 대신 단락으로 사용하여 쌍을 반환합니다. 또한이 ParagraphRecordReader의 ParagraphInputFormat은 표준 TextInputFormat만큼 간단합니다.
이 구현에 필요한 링크와 다음 게시물에 대한 몇 가지 단어를 찾을 수 있습니다. http://hadoop-mapreduce.blogspot.com/2011/03/little-more-complicated-recordreaders.html
베스트
from https://stackoverflow.com/questions/2711118/multiple-lines-of-text-to-a-single-map by cc-by-sa and MIT license
'HADOOP' 카테고리의 다른 글
[HADOOP] pyspark / spark를 사용하여 큰 분산 데이터 세트 샘플링 (0) | 2019.06.15 |
---|---|
[HADOOP] Hive에 저장된 데이터에 대해 돼지 쿼리 실행 (0) | 2019.06.15 |
[HADOOP] RDD가 아닌 Spark 프로그램에서 일반 텍스트 HDFS (또는 로컬) 파일을 쓸 수 있습니까? (0) | 2019.06.15 |
[HADOOP] (하둡) MapReduce - 체인 작업 - JobControl이 멈추지 않습니다 (0) | 2019.06.15 |
[HADOOP] 먼저 실행되는 것은 파티셔너 또는 결합기입니까? (0) | 2019.06.15 |