[HADOOP] 여러 파일을 단일 매퍼로 결합하는 TextLine과 같은 Scalding Source 만들기
HADOOP여러 파일을 단일 매퍼로 결합하는 TextLine과 같은 Scalding Source 만들기
우리는 많은 작은 파일을 결합해야합니다. Scalding에서는 TextLine을 사용하여 텍스트 줄로 파일을 읽을 수 있습니다. 문제는 파일 당 1 매퍼를 얻지 만 여러 파일을 결합하여 1 매퍼로 처리하도록하려는 것입니다.
필자는 입력 형식을 CombineFileInputFormat의 구현으로 변경해야한다는 것을 알고 있으며,이 작업에는 계단식 CombinedHfs를 사용할 수 있습니다. 이를 수행하는 방법을 배울 수는 없지만, 예를 들어 CombineTextLine이라는 자체 Scalding 소스를 정의하는 코드는 매우 소수에 불과합니다.
이 작업을 수행하는 코드를 제공 할 수있는 사람에게 많은 감사를드립니다.
부차적 인 질문으로, 우리는 s3에있는 몇 가지 데이터를 가지고 있습니다. 주어진 솔루션이 s3 파일에서 작동하면 좋을 것입니다 - 나는 그것이 CombineFileInputFormat 또는 CombinedHfs가 s3에서 작동하는지 여부에 달려 있다고 생각합니다.
해결법
-
==============================
1.당신은 당신의 질문에 아이디어를 얻습니다, 그래서 여기에 당신을위한 해결책이있을 것입니다.
당신은 당신의 질문에 아이디어를 얻습니다, 그래서 여기에 당신을위한 해결책이있을 것입니다.
CombineFileInputFormat을 확장하고 사용자 정의 RecordReader를 사용하는 사용자 입력 서식을 만듭니다. Java 코드를 보여 드리고 있습니다 만 원하는 경우 쉽게 스칼라로 변환 할 수 있습니다.
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.LineRecordReader; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.lib.CombineFileInputFormat; import org.apache.hadoop.mapred.lib.CombineFileRecordReader; import org.apache.hadoop.mapred.lib.CombineFileSplit; public class CombinedInputFormat<K, V> extends CombineFileInputFormat<K, V> { public static class MyKeyValueLineRecordReader implements RecordReader<LongWritable,Text> { private final RecordReader<LongWritable,Text> delegate; public MyKeyValueLineRecordReader(CombineFileSplit split, Configuration conf, Reporter reporter, Integer idx) throws IOException { FileSplit fileSplit = new FileSplit(split.getPath(idx), split.getOffset(idx), split.getLength(idx), split.getLocations()); delegate = new LineRecordReader(conf, fileSplit); } @Override public boolean next(LongWritable key, Text value) throws IOException { return delegate.next(key, value); } @Override public LongWritable createKey() { return delegate.createKey(); } @Override public Text createValue() { return delegate.createValue(); } @Override public long getPos() throws IOException { return delegate.getPos(); } @Override public void close() throws IOException { delegate.close(); } @Override public float getProgress() throws IOException { return delegate.getProgress(); } } @Override public RecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException { return new CombineFileRecordReader(job, (CombineFileSplit) split, reporter, (Class) MyKeyValueLineRecordReader.class); } }
그런 다음 TextLine 클래스를 확장하고 방금 정의한 입력 형식 (스칼라 코드)을 사용해야합니다.
import cascading.scheme.hadoop.TextLine import cascading.flow.FlowProcess import org.apache.hadoop.mapred.{OutputCollector, RecordReader, JobConf} import cascading.tap.Tap import com.twitter.scalding.{FixedPathSource, TextLineScheme} import cascading.scheme.Scheme class CombineFileTextLine extends TextLine{ override def sourceConfInit(flowProcess: FlowProcess[JobConf], tap: Tap[JobConf, RecordReader[_, _], OutputCollector[_, _]], conf: JobConf) { super.sourceConfInit(flowProcess, tap, conf) conf.setInputFormat(classOf[CombinedInputFormat[String, String]]) } }
결합 된 입력에 대한 체계를 만듭니다.
trait CombineFileTextLineScheme extends TextLineScheme{ override def hdfsScheme = new CombineFileTextLine().asInstanceOf[Scheme[JobConf,RecordReader[_,_],OutputCollector[_,_],_,_]] }
마지막으로 소스 클래스를 만듭니다.
case class CombineFileMultipleTextLine(p : String*) extends FixedPathSource(p :_*) with CombineFileTextLineScheme
여러 경로 대신 단일 경로를 사용하려는 경우 원본 클래스의 변경은 간단합니다.
도움이되기를 바랍니다.
-
==============================
2.이게 속임수 야. 그렇지? - https://wiki.apache.org/hadoop/HowManyMapsAndReduces
이게 속임수 야. 그렇지? - https://wiki.apache.org/hadoop/HowManyMapsAndReduces
from https://stackoverflow.com/questions/23917404/create-scalding-source-like-textline-that-combines-multiple-files-into-single-ma by cc-by-sa and MIT license
'HADOOP' 카테고리의 다른 글
[HADOOP] hdfs의 하이브 파일은 관리되지 않는 (외부가 아닌) 테이블을 삭제할 때 삭제되지 않습니다. (0) | 2019.06.21 |
---|---|
[HADOOP] 테스트 HBase 연결 (0) | 2019.06.21 |
[HADOOP] Spark 작업 부하에 HDFS가 필요합니까? (0) | 2019.06.21 |
[HADOOP] JSON 파일 용 Hadoop (0) | 2019.06.21 |
[HADOOP] Spark - 접속시에 실패했습니다 : java.net.ConnectException - localhost (0) | 2019.06.21 |