복붙노트

[HADOOP] 스파크 스트리밍 : HDFS

HADOOP

스파크 스트리밍 : HDFS

어떤 이유로 (예 : 데모, 배포) 내 Spark 작업이 중단되었지만 HDFS 디렉토리로의 쓰기 / 이동이 지속되는 경우 Spark Streaming Job을 한 번 실행하면 해당 파일을 건너 뛸 수 있습니다.

    val hdfsDStream = ssc.textFileStream("hdfs://sandbox.hortonworks.com/user/root/logs")

    hdfsDStream.foreachRDD(
      rdd => logInfo("Number of records in this batch: " + rdd.count())
    )

출력 ->이 배치의 레코드 수 : 0

해결법

  1. ==============================

    1.Dean이 언급했듯이 textFileStream은 새 파일을 사용하는 경우에만 기본값을 사용합니다.

    Dean이 언급했듯이 textFileStream은 새 파일을 사용하는 경우에만 기본값을 사용합니다.

      def textFileStream(directory: String): DStream[String] = {
        fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString)
      }
    

    그래서,이 모든 것은 fileStream의 변형입니다.

    def fileStream[
        K: ClassTag,
        V: ClassTag,
        F <: NewInputFormat[K, V]: ClassTag
      ] (directory: String): InputDStream[(K, V)] = {
        new FileInputDStream[K, V, F](this, directory)
      }
    

    그리고 FileInputDStream 클래스를 살펴보면 실제로 기존 파일을 찾을 수 있지만 기본값은 new로만 나타납니다.

    newFilesOnly: Boolean = true,
    

    따라서 StreamingContext 코드로 돌아 가면 fileStream 메서드를 직접 호출하여 사용할 수있는 오버로드가 있음을 알 수 있습니다.

    def fileStream[
     K: ClassTag,
     V: ClassTag,
     F <: NewInputFormat[K, V]: ClassTag] 
    (directory: String, filter: Path => Boolean, newFilesOnly: Boolean):InputDStream[(K, V)] = {
      new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly)
    }
    
    ssc.fileStream[LongWritable, Text, TextInputFormat]
        (directory, FileInputDStream.defaultFilter, false).map(_._2.toString)
    
  2. ==============================

    2.Spark이 이미 디렉토리에있는 파일을 읽을 것으로 기대합니까? 그렇다면 이것은 일반적인 오해입니다. 놀라움으로 저를 데려갔습니다. textFileStream은 새로운 파일이 나타날 디렉토리를 감시하고 읽습니다. 시작하거나 파일을 이미 읽었을 때 이미 디렉토리에있는 파일을 무시합니다.

    Spark이 이미 디렉토리에있는 파일을 읽을 것으로 기대합니까? 그렇다면 이것은 일반적인 오해입니다. 놀라움으로 저를 데려갔습니다. textFileStream은 새로운 파일이 나타날 디렉토리를 감시하고 읽습니다. 시작하거나 파일을 이미 읽었을 때 이미 디렉토리에있는 파일을 무시합니다.

    이론적으로는 HDFS에 파일을 쓰는 프로세스가있을 것입니다. 그런 다음 Spark에서 읽기를 원할 것입니다. 이러한 파일은 원자 적으로 많이 나타납니다. 예를 들어 천천히 다른 곳에 작성한 다음 감시 된 디렉토리로 이동했습니다. 이는 HDFS가 파일 읽기 및 쓰기를 동시에 제대로 처리하지 않기 때문입니다.

  3. ==============================

    3.

    val filterF = new Function[Path, Boolean] {
        def apply(x: Path): Boolean = {
          println("looking if "+x+" to be consider or not")
          val flag = if(x.toString.split("/").last.split("_").last.toLong < System.currentTimeMillis){ println("considered "+x); list += x.toString; true}
           else{ false }
          return flag
        }
    }
    

    이 필터 기능은 각 경로가 실제로 사용자가 선호하는 경로인지 여부를 결정하는 데 사용됩니다. 따라서 적용 내 기능은 사용자 요구 사항에 따라 사용자 정의되어야합니다.

    val streamed_rdd = ssc.fileStream[LongWritable, Text, TextInputFormat]("/user/hdpprod/temp/spark_streaming_output",filterF,false).map{case (x, y) => (y.toString)}
    

    이제 filestream 함수의 세 번째 변수를 false로 설정해야합니다. 새 파일뿐만 아니라 스트리밍 디렉토리의 기존 기존 파일도 고려해야합니다.

  4. from https://stackoverflow.com/questions/29022379/spark-streaming-hdfs by cc-by-sa and MIT license