복붙노트

[HADOOP] 여러 파일을 단일 매퍼로 결합하는 TextLine과 같은 Scalding Source 만들기

HADOOP

여러 파일을 단일 매퍼로 결합하는 TextLine과 같은 Scalding Source 만들기

우리는 많은 작은 파일을 결합해야합니다. Scalding에서는 TextLine을 사용하여 텍스트 줄로 파일을 읽을 수 있습니다. 문제는 파일 당 1 매퍼를 얻지 만 여러 파일을 결합하여 1 매퍼로 처리하도록하려는 것입니다.

필자는 입력 형식을 CombineFileInputFormat의 구현으로 변경해야한다는 것을 알고 있으며,이 작업에는 계단식 CombinedHfs를 사용할 수 있습니다. 이를 수행하는 방법을 배울 수는 없지만, 예를 들어 CombineTextLine이라는 자체 Scalding 소스를 정의하는 코드는 매우 소수에 불과합니다.

이 작업을 수행하는 코드를 제공 할 수있는 사람에게 많은 감사를드립니다.

부차적 인 질문으로, 우리는 s3에있는 몇 가지 데이터를 가지고 있습니다. 주어진 솔루션이 s3 파일에서 작동하면 좋을 것입니다 - 나는 그것이 CombineFileInputFormat 또는 CombinedHfs가 s3에서 작동하는지 여부에 달려 있다고 생각합니다.

해결법

  1. ==============================

    1.당신은 당신의 질문에 아이디어를 얻습니다, 그래서 여기에 당신을위한 해결책이있을 것입니다.

    당신은 당신의 질문에 아이디어를 얻습니다, 그래서 여기에 당신을위한 해결책이있을 것입니다.

    CombineFileInputFormat을 확장하고 사용자 정의 RecordReader를 사용하는 사용자 입력 서식을 만듭니다. Java 코드를 보여 드리고 있습니다 만 원하는 경우 쉽게 스칼라로 변환 할 수 있습니다.

    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapred.FileSplit;
    import org.apache.hadoop.mapred.InputSplit;
    import org.apache.hadoop.mapred.JobConf;
    import org.apache.hadoop.mapred.LineRecordReader;
    import org.apache.hadoop.mapred.RecordReader;
    import org.apache.hadoop.mapred.Reporter;
    import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
    import org.apache.hadoop.mapred.lib.CombineFileRecordReader;
    import org.apache.hadoop.mapred.lib.CombineFileSplit;
    
    public class CombinedInputFormat<K, V> extends CombineFileInputFormat<K, V> {
    
        public static class MyKeyValueLineRecordReader implements RecordReader<LongWritable,Text> {
            private final RecordReader<LongWritable,Text> delegate;
    
            public MyKeyValueLineRecordReader(CombineFileSplit split, Configuration conf, Reporter reporter, Integer idx) throws IOException {
                FileSplit fileSplit = new FileSplit(split.getPath(idx), split.getOffset(idx), split.getLength(idx), split.getLocations());
                delegate = new LineRecordReader(conf, fileSplit);
            }
    
            @Override
            public boolean next(LongWritable key, Text value) throws IOException {
                return delegate.next(key, value);
            }
    
            @Override
            public LongWritable createKey() {
                return delegate.createKey();
            }
    
            @Override
            public Text createValue() {
                return delegate.createValue();
            }
    
            @Override
            public long getPos() throws IOException {
                return delegate.getPos();
            }
    
            @Override
            public void close() throws IOException {
                delegate.close();
            }
    
            @Override
            public float getProgress() throws IOException {
                return delegate.getProgress();
            }
        }
    
        @Override
        public RecordReader getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
            return new CombineFileRecordReader(job, (CombineFileSplit) split, reporter, (Class) MyKeyValueLineRecordReader.class);
        }
    
    }
    

    그런 다음 TextLine 클래스를 확장하고 방금 정의한 입력 형식 (스칼라 코드)을 사용해야합니다.

    import cascading.scheme.hadoop.TextLine
    import cascading.flow.FlowProcess
    import org.apache.hadoop.mapred.{OutputCollector, RecordReader, JobConf}
    import cascading.tap.Tap
    import com.twitter.scalding.{FixedPathSource, TextLineScheme}
    import cascading.scheme.Scheme
    
    class CombineFileTextLine extends TextLine{
    
      override def sourceConfInit(flowProcess: FlowProcess[JobConf], tap: Tap[JobConf, RecordReader[_, _], OutputCollector[_, _]], conf: JobConf) {
        super.sourceConfInit(flowProcess, tap, conf)
        conf.setInputFormat(classOf[CombinedInputFormat[String, String]])
      }
    }
    

    결합 된 입력에 대한 체계를 만듭니다.

    trait CombineFileTextLineScheme extends TextLineScheme{
    
      override def hdfsScheme = new CombineFileTextLine().asInstanceOf[Scheme[JobConf,RecordReader[_,_],OutputCollector[_,_],_,_]]
    }
    

    마지막으로 소스 클래스를 만듭니다.

    case class CombineFileMultipleTextLine(p : String*) extends  FixedPathSource(p :_*) with CombineFileTextLineScheme
    

    여러 경로 대신 단일 경로를 사용하려는 경우 원본 클래스의 변경은 간단합니다.

    도움이되기를 바랍니다.

  2. ==============================

    2.이게 속임수 야. 그렇지? - https://wiki.apache.org/hadoop/HowManyMapsAndReduces

    이게 속임수 야. 그렇지? - https://wiki.apache.org/hadoop/HowManyMapsAndReduces

  3. from https://stackoverflow.com/questions/23917404/create-scalding-source-like-textline-that-combines-multiple-files-into-single-ma by cc-by-sa and MIT license