[SCALA] 어떻게 스파크 멀티 라인 입력 레코드를 처리하는
SCALA어떻게 스파크 멀티 라인 입력 레코드를 처리하는
내가 입력 파일 (매우 큰 파일)에서 여러 줄에 걸쳐 각 레코드의 확산이 있습니다.
전의:
Id: 2
ASIN: 0738700123
title: Test tile for this product
group: Book
salesrank: 168501
similar: 5 0738700811 1567184912 1567182813 0738700514 0738700915
categories: 2
|Books[283155]|Subjects[1000]|Religion & Spirituality[22]|Earth-Based Religions[12472]|Wicca[12484]
|Books[283155]|Subjects[1000]|Religion & Spirituality[22]|Earth-Based Religions[12472]|Witchcraft[12486]
reviews: total: 12 downloaded: 12 avg rating: 4.5
2001-12-16 cutomer: A11NCO6YTE4BTJ rating: 5 votes: 5 helpful: 4
2002-1-7 cutomer: A9CQ3PLRNIR83 rating: 4 votes: 5 helpful: 5
어떻게 식별하고 스파크의 각 멀티 라인 기록을 처리하는 방법을?
해결법
-
==============================
1.멀티 라인 데이터가 정의 된 레코드 분리가있는 경우, 당신은 hadoop.Configuration 객체를 통해 분리를 제공, 다중 회선 레코드에 대한 하둡 지원을 사용할 수 있습니다 :
멀티 라인 데이터가 정의 된 레코드 분리가있는 경우, 당신은 hadoop.Configuration 객체를 통해 분리를 제공, 다중 회선 레코드에 대한 하둡 지원을 사용할 수 있습니다 :
이런 식으로 뭔가는해야한다 :
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io.{LongWritable, Text} import org.apache.hadoop.mapreduce.lib.input.TextInputFormat val conf = new Configuration conf.set("textinputformat.record.delimiter", "id:") val dataset = sc.newAPIHadoopFile("/path/to/data", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf) val data = dataset.map(x=>x._2.toString)
이는 각각의 요소는 기록에 해당하는 RDD [문자열]를 제공합니다. 그 후 당신은 당신의 애플리케이션 요구 사항은 다음 각 레코드를 구문 분석 할 필요가있다.
-
==============================
2.나는 사용자 정의 입력 형식과 기록 판독기를 구현하여 이런 짓을했는지.
나는 사용자 정의 입력 형식과 기록 판독기를 구현하여 이런 짓을했는지.
public class ParagraphInputFormat extends TextInputFormat { @Override public RecordReader<LongWritable, Text> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) { return new ParagraphRecordReader(); } } public class ParagraphRecordReader extends RecordReader<LongWritable, Text> { private long end; private boolean stillInChunk = true; private LongWritable key = new LongWritable(); private Text value = new Text(); private FSDataInputStream fsin; private DataOutputBuffer buffer = new DataOutputBuffer(); private byte[] endTag = "\n\r\n".getBytes(); public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { FileSplit split = (FileSplit) inputSplit; Configuration conf = taskAttemptContext.getConfiguration(); Path path = split.getPath(); FileSystem fs = path.getFileSystem(conf); fsin = fs.open(path); long start = split.getStart(); end = split.getStart() + split.getLength(); fsin.seek(start); if (start != 0) { readUntilMatch(endTag, false); } } public boolean nextKeyValue() throws IOException { if (!stillInChunk) return false; boolean status = readUntilMatch(endTag, true); value = new Text(); value.set(buffer.getData(), 0, buffer.getLength()); key = new LongWritable(fsin.getPos()); buffer.reset(); if (!status) { stillInChunk = false; } return true; } public LongWritable getCurrentKey() throws IOException, InterruptedException { return key; } public Text getCurrentValue() throws IOException, InterruptedException { return value; } public float getProgress() throws IOException, InterruptedException { return 0; } public void close() throws IOException { fsin.close(); } private boolean readUntilMatch(byte[] match, boolean withinBlock) throws IOException { int i = 0; while (true) { int b = fsin.read(); if (b == -1) return false; if (withinBlock) buffer.write(b); if (b == match[i]) { i++; if (i >= match.length) { return fsin.getPos() < end; } } else i = 0; } } }
종료 태그는 각 레코드의 끝을 식별합니다.
from https://stackoverflow.com/questions/27541637/how-to-process-multi-line-input-records-in-spark by cc-by-sa and MIT license
'SCALA' 카테고리의 다른 글
[SCALA] 때 스칼라 중위 표기법에 괄호를 사용하는 (0) | 2019.11.05 |
---|---|
[SCALA] 삽입 순서 스칼라지도 구현 유지 항목? (0) | 2019.11.05 |
[SCALA] 스파크에 타임 스탬프로 문자열 필드를 변환하는 더 나은 방법 (0) | 2019.11.05 |
[SCALA] java.lang.NoClassDefFoundError가 : 조직 / 아파치 / 스파크 / 스트리밍 / 트위터 / TwitterUtils $ TwitterPopularTags을 실행하는 동안 (0) | 2019.11.05 |
[SCALA] 많은 작은 파일을 작성 dataframe 쓰기 방법을 불꽃 (0) | 2019.11.05 |