복붙노트

[SCALA] 스칼라에서 여러 기록을 구문 분석

SCALA

스칼라에서 여러 기록을 구문 분석

여기 내 RDD입니다 [문자열]

M1 module1
PIP a Z A
PIP b Z B
PIP c Y n4

M2 module2
PIP a I n4
PIP b O D
PIP c O n5

등등. 기본적으로,에 따라 반복 될 수있는 후속 PIP 라인의 키 RDD (라인 1상의 제 단어를 포함하는) 및 값을 필요로한다.

나는 다음을 시도했습니다

val usgPairRDD = usgRDD.map(x => (x.split("\\n")(0), x))

그러나 이것은 나에게 다음과 같은 출력을 제공합니다

(,)
(M1 module1,M1 module1)
(PIP a Z A,PIP a Z A)
(PIP b Z B,PIP b Z B)
(PIP c Y n4,PIP c Y n4)
(,)
(M2 module2,M2 module2)
(PIP a I n4,PIP a I n4)
(PIP b O D,PIP b O D)
(PIP c O n5,PIP c O n5)

대신에, 나는 것으로 출력을하고 싶습니다

module1, (PIP a Z A, PIP b Z B, PIP b Z B)
module2, (PIP a I n4,PIP b O D, PIP c O n5)

내가 무엇을 잘못하고 있지? 나는 스파크 API에 아주 새로운 오전. 감사

안녕하세요 @ zero323

usgRDD.take(10).foreach(x => println(x + "%%%%%%%%%"))

수익률 ...

%%%%%%%%%
M1 module1%%%%%%%%%
PIP a Z A%%%%%%%%%
PIP b Z B%%%%%%%%%
PIP c Y n4%%%%%%%%%
%%%%%%%%%
M2 module2%%%%%%%%%
PIP a I n4%%%%%%%%%
PIP b O D%%%%%%%%%
PIP c O n5%%%%%%%%%

등등

안녕하세요 @ zero323 및 @Daniel Darabos 내 입력은 많은 많은 파일 (TBS에 이르기까지)의 매우 매우 큰 집합입니다. 여기서 샘플이다 ..

BIN n4
BIN n5
BIN D
BIN E
PIT A I A
PIT B I B 
PIT C I C
PIT D O D
PIT E O E
DEF M1 module1
   PIP a Z A
   PIP b Z B
   PIP c Y n4
DEF M2 module2
   PIP a I n4
   PIP b O D
   PIP c O n5

나는 3 가지 RDDS의 모든 방식, PIT 및 DEF (아래를 포함하여 PIP 라인)가 필요합니다. 여기에 내가 현재이 일을하고 어떻게 (논의에서, 나는 usgRDD 아래 잘못 계산 감지)

val binRDD = levelfileRDD.filter(line => line.contains("BIN"))
val pitRDD = levelfileRDD.filter(line => line.contains("PIT"))
val usgRDD = levelfileRDD.filter(line => !line.contains("BIN") && !line.contains("PIT")).flatMap(s=>s.split("DEF").map(_.trim))

내가 나중에 유효성 검사를 수행해야하기 때문에 나는 RDDs의 (순간) 3 가지가 필요합니다. N4는 BIN 요소가 "DEF M2 모듈 2 '에서 예"N4 "존재 만 할 수있다. RDDs에서, 나는 GraphX ​​API를 (나는 분명히이 시점 개까지 오지 않은)을 사용하여 파생 관계를 바랍니다. 각 usgPairRDD은 (달리 usgRDD로부터 계산 또는) 다음을 인쇄하는 경우는 이상적인 것

module1, (a Z A, b Z B, c Y n4) %%%%%%%
module2, (a I n4, b O D, c O n5) %%%%%%%

나는 감각을 만드는 중이 바랍니다. 스파크 신들에 대한 사과, 나는 아니라고합니다.

해결법

  1. ==============================

    1.기본적 스파크으로 한 줄에 하나의 요소를 작성합니다. 그것은 귀하의 경우 모든 레코드는 의견 다니엘 Darabos에 의해 명시된 바와 같이, 다른 노동자에 의해 처리 될 수있는 여러 요소의 확산을 의미한다.

    기본적 스파크으로 한 줄에 하나의 요소를 작성합니다. 그것은 귀하의 경우 모든 레코드는 의견 다니엘 Darabos에 의해 명시된 바와 같이, 다른 노동자에 의해 처리 될 수있는 여러 요소의 확산을 의미한다.

    데이터가 상대적으로 정기적으로하고 사용자 정의 구분 기호로 newAPIHadoopFile을 사용할 수 있어야 빈 라인으로 분리 된 것 같습니다 이후 :

    import org.apache.spark.rdd.RDD
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
    import org.apache.hadoop.io.{LongWritable, Text}
    
    val path: String = ???
    
    val conf = new org.apache.hadoop.mapreduce.Job().getConfiguration
    conf.set("textinputformat.record.delimiter", "\n\n")
    
    val usgRDD = sc.newAPIHadoopFile(
        path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf)
      .map{ case (_, v) => v.toString }
    
    val usgPairRDD: RDD[(String, Seq[String])] = usgRDD.map(_.split("\n") match {
      case Array(x, xs @ _*) => (x, xs)
    })
    
  2. from https://stackoverflow.com/questions/34157104/parsing-multiline-records-in-scala by cc-by-sa and MIT license