복붙노트

[SCALA] 어떻게 스파크 멀티 라인 입력 레코드를 처리하는

SCALA

어떻게 스파크 멀티 라인 입력 레코드를 처리하는

내가 입력 파일 (매우 큰 파일)에서 여러 줄에 걸쳐 각 레코드의 확산이 있습니다.

전의:

Id:   2
ASIN: 0738700123
  title: Test tile for this product
  group: Book
  salesrank: 168501
  similar: 5  0738700811  1567184912  1567182813  0738700514  0738700915
  categories: 2
   |Books[283155]|Subjects[1000]|Religion & Spirituality[22]|Earth-Based Religions[12472]|Wicca[12484]
   |Books[283155]|Subjects[1000]|Religion & Spirituality[22]|Earth-Based Religions[12472]|Witchcraft[12486]
  reviews: total: 12  downloaded: 12  avg rating: 4.5
    2001-12-16  cutomer: A11NCO6YTE4BTJ  rating: 5  votes:   5  helpful:   4
    2002-1-7  cutomer:  A9CQ3PLRNIR83  rating: 4  votes:   5  helpful:   5

어떻게 식별하고 스파크의 각 멀티 라인 기록을 처리하는 방법을?

해결법

  1. ==============================

    1.멀티 라인 데이터가 정의 된 레코드 분리가있는 경우, 당신은 hadoop.Configuration 객체를 통해 분리를 제공, 다중 회선 레코드에 대한 하둡 지원을 사용할 수 있습니다 :

    멀티 라인 데이터가 정의 된 레코드 분리가있는 경우, 당신은 hadoop.Configuration 객체를 통해 분리를 제공, 다중 회선 레코드에 대한 하둡 지원을 사용할 수 있습니다 :

    이런 식으로 뭔가는해야한다 :

    import org.apache.hadoop.conf.Configuration
    import org.apache.hadoop.io.{LongWritable, Text}
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
    val conf = new Configuration
    conf.set("textinputformat.record.delimiter", "id:")
    val dataset = sc.newAPIHadoopFile("/path/to/data", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf)
    val data = dataset.map(x=>x._2.toString)
    

    이는 각각의 요소는 기록에 해당하는 RDD [문자열]를 제공합니다. 그 후 당신은 당신의 애플리케이션 요구 사항은 다음 각 레코드를 구문 분석 할 필요가있다.

  2. ==============================

    2.나는 사용자 정의 입력 형식과 기록 판독기를 구현하여 이런 짓을했는지.

    나는 사용자 정의 입력 형식과 기록 판독기를 구현하여 이런 짓을했는지.

    public class ParagraphInputFormat extends TextInputFormat {
    
        @Override
        public RecordReader<LongWritable, Text> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) {
            return new ParagraphRecordReader();
        }
    }
    
    public class ParagraphRecordReader extends RecordReader<LongWritable, Text> {
        private long end;
        private boolean stillInChunk = true;
    
        private LongWritable key = new LongWritable();
        private Text value = new Text();
    
        private FSDataInputStream fsin;
        private DataOutputBuffer buffer = new DataOutputBuffer();
    
        private byte[] endTag = "\n\r\n".getBytes();
    
        public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
            FileSplit split = (FileSplit) inputSplit;
            Configuration conf = taskAttemptContext.getConfiguration();
            Path path = split.getPath();
            FileSystem fs = path.getFileSystem(conf);
    
            fsin = fs.open(path);
            long start = split.getStart();
            end = split.getStart() + split.getLength();
            fsin.seek(start);
    
            if (start != 0) {
                readUntilMatch(endTag, false);
            }
        }
    
        public boolean nextKeyValue() throws IOException {
            if (!stillInChunk) return false;
    
            boolean status = readUntilMatch(endTag, true);
    
            value = new Text();
            value.set(buffer.getData(), 0, buffer.getLength());
            key = new LongWritable(fsin.getPos());
            buffer.reset();
    
            if (!status) {
                stillInChunk = false;
            }
    
            return true;
        }
    
        public LongWritable getCurrentKey() throws IOException, InterruptedException {
            return key;
        }
    
        public Text getCurrentValue() throws IOException, InterruptedException {
            return value;
        }
    
        public float getProgress() throws IOException, InterruptedException {
            return 0;
        }
    
        public void close() throws IOException {
            fsin.close();
        }
    
        private boolean readUntilMatch(byte[] match, boolean withinBlock) throws IOException {
            int i = 0;
            while (true) {
                int b = fsin.read();
                if (b == -1) return false;
                if (withinBlock) buffer.write(b);
                if (b == match[i]) {
                    i++;
                    if (i >= match.length) {
                        return fsin.getPos() < end;
                    }
                } else i = 0;
            }
        }
    
    }
    

    종료 태그는 각 레코드의 끝을 식별합니다.

  3. from https://stackoverflow.com/questions/27541637/how-to-process-multi-line-input-records-in-spark by cc-by-sa and MIT license