복붙노트

[HADOOP] 스파크에서 이진 파일을 처리하는 사용자 정의 하둡 입력 형식을 사용하여

HADOOP

스파크에서 이진 파일을 처리하는 사용자 정의 하둡 입력 형식을 사용하여

나는 바이너리 파일을 처리 하둡 기반 솔루션을 개발했습니다. 이것은 고전적인 하둡 MR 기술을 사용합니다. 바이너리 파일은 약 10GB이며 73 개 HDFS 블록으로 분할하고,지도 과정으로 작성 비즈니스 로직이 73 개 블록에 각각 운영하고 있습니다. 우리는지도 기능 키 (intWritable)과 가치 (BytesWritable)을 반환하는 customInputFormat와 하둡에 CustomRecordReader을 개발했습니다. 값은 HDFS 블록 (bianry 데이터)의 내용 뿐이다. 비즈니스 로직이 데이터를 읽는 방법을 알고있다.

지금, 나는 불꽃의 포트를이 코드를 싶습니다. 나는 불꽃에 선발 오전 스파크에서 간단한 예제 (단어 수, 파이 예)를 실행할 수 있습니다. 그러나, 간단한 예는 스파크의 BinaryFiles를 처리 할 수 ​​있습니다. 나는이 사용 사례에 대한 두 가지 솔루션이 있습니다 참조하십시오. 처음에, 사용자 입력 포맷 레코드 판독기를 사용하여 피. (가) 그 HDFS 블록에 대한 RDD를 생성 스파크의 방법 (방법)을 찾아 비즈니스 로직에 HDFS 블록의 콘텐츠를 공급 방법 등의지도를 사용합니다. 이것이 가능하지 않다면, 나는 사용자 정의 입력 형식과 등 HadoopAPI, HadoopRDD 내 문제 등의 몇 가지 방법을 사용하여 사용자 정의 리더를 다시 사용하고 싶습니다 : - 나도 몰라 첫 번째 방법이 가능 여부. 가능하면 사람이 예제가 들어 몇 가지 포인터를 제공하시기 바랍니다 수 있습니까? 나는 두 번째 방법은 있지만 매우 실패를 시도하고 있었다. 저는 여기에 사용되는 코드입니다

package org {  
object Driver {      
  def myFunc(key : IntWritable, content : BytesWritable):Int = {      
    println(key.get())
    println(content.getSize())
    return 1       
  }    
  def main(args: Array[String]) {       
    // create a spark context
    val conf = new SparkConf().setAppName("Dummy").setMaster("spark://<host>:7077")
    val sc = new SparkContext(conf)    
    println(sc)   
    val rd = sc.newAPIHadoopFile("hdfs:///user/hadoop/myBin.dat", classOf[RandomAccessInputFormat], classOf[IntWritable], classOf[BytesWritable])  
    val count = rd.map (x => myFunc(x._1, x._2)).reduce(_+_)
    println("The count is *****************************"+count)
  }
} 

}

주요 방법 인쇄 문이지도 기능 인쇄 0 내부의 인쇄 문 반면 블록의 수는 73를 인쇄 있습니다.

내가 잘못 여기서 뭐하는 거지 어디 사람이 말할 수 있습니까? 내가 API 올바른 방법으로 사용하고 있지 않다 생각하지만, 일부 문서 / 예문을 찾지 못했습니다.

해결법

  1. ==============================

    1.한 눈에 문제의 커플. 당신은 MYFUNC를 정의하지만, FUNC를 호출합니다. 귀하의 MYFUNC 더 반환 형식이 없습니다, 그래서 당신은 수집 호출 할 수 없습니다 (). 당신의 MYFUNC 진정 반환 값이없는 경우, 대신지도의 foreach 문을 수행 할 수 있습니다.

    한 눈에 문제의 커플. 당신은 MYFUNC를 정의하지만, FUNC를 호출합니다. 귀하의 MYFUNC 더 반환 형식이 없습니다, 그래서 당신은 수집 호출 할 수 없습니다 (). 당신의 MYFUNC 진정 반환 값이없는 경우, 대신지도의 foreach 문을 수행 할 수 있습니다.

    () 수집은 (드라이버에) 로컬로 물건을 할 수 있도록 드라이버에 대한 RDD의 데이터를 가져옵니다.

  2. ==============================

    2.나는이 문제에 일부 진전을 만들었습니다. 내가 지금 일을 아래의 기능을 사용하고 있습니다

    나는이 문제에 일부 진전을 만들었습니다. 내가 지금 일을 아래의 기능을 사용하고 있습니다

    var hRDD = new NewHadoopRDD(sc, classOf[RandomAccessInputFormat], 
            classOf[IntWritable], 
            classOf[BytesWritable],
            job.getConfiguration() 
            )    
    
    val count = hRDD.mapPartitionsWithInputSplit{ (split, iter) => myfuncPart(split, iter)}.collect()
    

    그러나 다른 오류로 내가 여기에 게시 한있는 세부 사항을 도착 스파크 맵 함수 내에서 HDFS의 파일에 액세스의 문제

    15/10/30 11:11:39 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 40.221.94.235): java.io.IOException: No FileSystem for scheme: spark
    at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2584)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
    
  3. from https://stackoverflow.com/questions/33373898/using-custom-hadoop-input-format-for-processing-binary-file-in-spark by cc-by-sa and MIT license