복붙노트

[HADOOP] 임팔라가 그것을 읽을 수 있도록 스파크 스트리밍의 출력물을 만드는 방법?

HADOOP

임팔라가 그것을 읽을 수 있도록 스파크 스트리밍의 출력물을 만드는 방법?

Spark Streaming API에 다음과 같은 문제가 있습니다. 나는 현재 Flume to Spark Streaming을 통해 입력 데이터를 스트리밍하고 있는데,이 데이터로 전처리를 할 계획이다. 그런 다음 데이터를 Hadoop의 파일 시스템에 저장하고 임팔라로 쿼리하고 싶습니다. 그러나 Spark는 데이터 파일을 별도의 디렉토리에 쓰고 모든 RDD에 대해 새로운 디렉토리가 생성됩니다.

이것은 문제입니다. 우선 임팔라의 외부 테이블은 하위 디렉토리를 감지 할 수 없으며 분할되지 않은 경우 디렉토리 안의 파일 만 감지하기 때문입니다. 둘째, 새로운 디렉토리는 Spark에 의해 매우 빠르게 추가되어 생성 된 모든 디렉토리에 대해 Impala에서 주기적으로 새 파티션을 생성하는 것이 성능면에서 매우 좋지 않습니다. 반면에 Spark에서 쓰기 작업의 롤 간격을 늘리면 디렉토리가 자주 생성되지 않으므로 Impala에서 들어오는 데이터를 읽을 때까지 지연이 추가됩니다. 시스템이 실시간 응용 프로그램을 지원해야하기 때문에 이는 허용되지 않습니다. Hive에서 다음 설정을 사용하여 파티셔닝 없이도 하위 테이블을 검색하도록 외부 테이블을 구성 할 수있었습니다.

set hive.mapred.supports.subdirectories=true;
set mapred.input.dir.recursive=true;

그러나 임팔라에는 이와 같은 기능이 없습니다.

현재 Flume에서 데이터를 읽고 HDFS에 쓰는 데 다음 코드를 사용하고 있습니다.

val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2)
stream.map(event => new String(event.event.getBody().array(), Charset.forName("UTF-8"))).saveAsTextFiles(path)

여기서 변수 경로는 텍스트 파일 (part-0000 등)이 추가되는 디렉토리의 접두사를 결정하고 나머지 디렉토리 이름은 Spark에 의해 생성 된 타임 스탬프입니다. 코드를 다음과 같이 변경할 수 있습니다.

val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2)
val mapStream = stream.map(event => new String(event.event.getBody().array(), Charset.forName("UTF-8")))
mapStream.foreachRDD(rdd => rdd.saveAsTextFile(path))

이 경우 파일은 경로로 결정된 동일한 디렉토리에 추가되지만 part-00000, part-00001, part-00002 등으로 명명되므로 이전에 생성 된 파일을 덮어 씁니다. Spark 소스 코드를 검사하는 동안 SparkHadoopWriter의 open () 메서드에서 파일의 이름이 한 줄로 결정됩니다.

val outputName = "part-"  + numfmt.format(splitID)

Spark API를 통해 splitID를 조작 할 방법이 없다고 생각됩니다. 요약하면, 제 질문은 다음과 같습니다 :

해결법

  1. ==============================

    1.임팔라는 말할 수 없습니다.

    임팔라는 말할 수 없습니다.

    part-xxxxx는 Spark이 따르는 hadoop 규칙입니다. 대부분의 도구는이 형식을 이해하며 스파크가 그것에 대해 많은 것을 할 수 없다고 생각합니다. 파트 파일은 고유해야하며 파티션 번호를 파일 이름에 추가하는 것이 일반적인 기술입니다.

    Impala에서는 대부분의 hadoop 도구가이 방법으로 생성하므로 파트 파일을 읽는 방법을 살펴볼 것입니다.

    디렉토리 구조를 사용자 정의하려는 경우 - 이것이 사용자의 질문이 아니지만 접두사 - 타임 스탬프 - 접미어 형식을 변경하는 것이 쉽게 이루어질 수 있습니다. Spark Steaming은 사용자 정의 할 수있는 Spark의 RDD.saveAsTextFiles (..)를 사용합니다. 다음은 DStream.scala의 코드입니다.

      def saveAsTextFiles(prefix: String, suffix: String = "") {
        val saveFunc = (rdd: RDD[T], time: Time) => {
          val file = rddToFileName(prefix, suffix, time)
          rdd.saveAsTextFile(file)
        }
        this.foreachRDD(saveFunc)
      }
    
  2. from https://stackoverflow.com/questions/24204656/how-to-make-spark-streaming-write-its-output-so-that-impala-can-read-it by cc-by-sa and MIT license