복붙노트

[HADOOP] Spark Hadoop에서 방송을받지 못했습니다

HADOOP

Spark Hadoop에서 방송을받지 못했습니다

스파크 제출 작업을 실행하고 "브로드 캐스트 _58_piece0 ...을 (를) 가져 오지 못했습니다."오류가 발생합니다. 내가 뭘 잘못하고 있는지 잘 모르겠습니다. UDF를 과도하게 사용합니까? 너무 복잡한 기능?

내 목표의 요약으로 pdf에서 텍스트를 구문 분석하고 JSON 객체에 base64로 인코딩 된 문자열로 저장됩니다. 나는 텍스트를 얻기 위해 Apache Tika를 사용하고 있으며 데이터 프레임을 많이 사용하여 더 쉽게 만들려고합니다.

RDD로 데이터에서 "main"외부의 함수로 tika를 통해 텍스트 추출을 실행하고 완벽하게 작동하는 코드를 작성했습니다. 데이터 프레임에서 UDF로 추출을 기본으로 가져 오려고하면 다양한 방법으로 붕괴됩니다. 여기에 오기 전에 실제로 최종 데이터 프레임을 다음과 같이 쓰려고했습니다.

valid.toJSON.saveAsTextFile(hdfs_dir)

이것은 모든 종류의 "파일 / 경로가 이미 존재합니다"라는 두통을 안겨주었습니다.

현재 코드 :

object Driver {

  def main(args: Array[String]):Unit = {
    val hdfs_dir = args(0)
    val spark_conf = new SparkConf().setAppName("Spark Tika HDFS")
    val sc = new SparkContext(spark_conf)
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)

    import sqlContext.implicits._

    // load json data into dataframe
    val df = sqlContext.read.json("hdfs://hadoophost.com:8888/user/spark/data/in/*")

    val extractInfo: (Array[Byte] => String) = (fp: Array[Byte]) => {

      val parser:Parser = new AutoDetectParser()
      val handler:BodyContentHandler = new BodyContentHandler(Integer.MAX_VALUE)
      val config:TesseractOCRConfig  = new TesseractOCRConfig()
      val pdfConfig:PDFParserConfig = new PDFParserConfig()

      val inputstream:InputStream = new ByteArrayInputStream(fp)

      val metadata:Metadata = new  Metadata()
      val parseContext:ParseContext = new ParseContext()
      parseContext.set(classOf[TesseractOCRConfig], config)
      parseContext.set(classOf[PDFParserConfig], pdfConfig)
      parseContext.set(classOf[Parser], parser)
      parser.parse(inputstream, handler, metadata, parseContext)
      handler.toString
    }


    val extract_udf = udf(extractInfo)

    val df2 = df.withColumn("unbased_media", unbase64($"media_file")).drop("media_file")

    val dfRenamed = df2.withColumn("media_corpus", extract_udf(col("unbased_media"))).drop("unbased_media")

    val depuncter: (String => String) = (corpus: String) => {
        val r = corpus.replaceAll("""[\p{Punct}]""", "")
        val s = r.replaceAll("""[0-9]""", "")
        s
    }

    val depuncter_udf = udf(depuncter)

    val withoutPunct = dfRenamed.withColumn("sentence", depuncter_udf(col("media_corpus")))

    val model = sc.objectFile[org.apache.spark.ml.PipelineModel]("hdfs://hadoophost.com:8888/user/spark/hawkeye-nb-ml-v2.0").first()

    val with_predictions = model.transform(withoutPunct)

    val fullNameChecker: ((String, String, String, String, String) => String) = (fname: String, mname: String, lname: String, sfx: String, text: String) =>{
        val newtext = text.replaceAll(" ", "").replaceAll("""[0-9]""", "").replaceAll("""[\p{Punct}]""", "").toLowerCase
        val new_fname = fname.replaceAll(" ", "").replaceAll("""[0-9]""", "").replaceAll("""[\p{Punct}]""", "").toLowerCase
        val new_mname = mname.replaceAll(" ", "").replaceAll("""[0-9]""", "").replaceAll("""[\p{Punct}]""", "").toLowerCase
        val new_lname = lname.replaceAll(" ", "").replaceAll("""[0-9]""", "").replaceAll("""[\p{Punct}]""", "").toLowerCase
        val new_sfx = sfx.replaceAll(" ", "").replaceAll("""[0-9]""", "").replaceAll("""[\p{Punct}]""", "").toLowerCase
        val name_full = new_fname.concat(new_mname).concat(new_lname).concat(new_sfx)
        val c = name_full.r.findAllIn(newtext).length
        c match {
            case 0 => "N"
            case _ => "Y"
        }
    }

    val fullNameChecker_udf = udf(fullNameChecker)

    val stringChecker: ((String, String) => String) = (term: String, text: String) => {
        val termLower = term.replaceAll("""[\p{Punct}]""", "").toLowerCase
        val textLower = text.replaceAll("""[\p{Punct}]""", "").toLowerCase
        val c = termLower.r.findAllIn(textLower).length
        c match {
        case 0 => "N"
        case _ => "Y"
        }
    }

    val stringChecker_udf = udf(stringChecker)


    val stringChecker2: ((String, String) => String) = (term: String, text: String) => {
        val termLower = term takeRight 4
        val textLower = text
        val c = termLower.r.findAllIn(textLower).length
        c match {
        case 0 => "N"
        case _ => "Y"
        }
    }

    val stringChecker2_udf = udf(stringChecker)

    val valids = with_predictions.withColumn("fname_valid", stringChecker_udf(col("first_name"), col("media_corpus")))
                                            .withColumn("lname_valid", stringChecker_udf(col("last_name"), col("media_corpus")))
                                            .withColumn("fname2_valid", stringChecker_udf(col("first_name_2"), col("media_corpus")))
                                            .withColumn("lname2_valid", stringChecker_udf(col("last_name_2"), col("media_corpus")))
                                            .withColumn("camt_valid", stringChecker_udf(col("chargeoff_amount"), col("media_corpus")))
                                            .withColumn("ocan_valid", stringChecker2_udf(col("original_creditor_account_nbr"), col("media_corpus")))
                                            .withColumn("dpan_valid", stringChecker2_udf(col("debt_provider_account_nbr"), col("media_corpus")))
                                            .withColumn("full_name_valid", fullNameChecker_udf(col("first_name"), col("middle_name"), col("last_name"), col("suffix"), col("media_corpus")))
                                            .withColumn("full_name_2_valid", fullNameChecker_udf(col("first_name_2"), col("middle_name_2"), col("last_name_2"), col("suffix_2"), col("media_corpus")))


    valids.write.mode(SaveMode.Overwrite).format("json").save(hdfs_dir)


  }

}

오류로 시작하는 전체 스택 추적 :

16/06/14 15:02:01 WARN TaskSetManager: Lost task 0.0 in stage 4.0 (TID 53, hdpd11n05.squaretwofinancial.com): org.apache.spark.SparkException: Task failed while writing rows.
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:272)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
Caused by: java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_58_piece0 of broadcast_58
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1222)
    at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165)
    at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
    at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
    at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88)
    at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
    at org.apache.spark.ml.feature.CountVectorizerModel$$anonfun$9$$anonfun$apply$7.apply(CountVectorizer.scala:222)
    at org.apache.spark.ml.feature.CountVectorizerModel$$anonfun$9$$anonfun$apply$7.apply(CountVectorizer.scala:221)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
    at org.apache.spark.ml.feature.CountVectorizerModel$$anonfun$9.apply(CountVectorizer.scala:221)
    at org.apache.spark.ml.feature.CountVectorizerModel$$anonfun$9.apply(CountVectorizer.scala:218)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.evalExpr43$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51)
    at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:263)
    ... 8 more
Caused by: org.apache.spark.SparkException: Failed to get broadcast_58_piece0 of broadcast_58
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:138)
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:138)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:137)
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:120)
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:120)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:120)
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:175)
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1219)
    ... 25 more

해결법

  1. ==============================

    1.비슷한 오류가 발생했습니다.

    비슷한 오류가 발생했습니다.

    CounterVectorModel의 브로드 캐스트 사용으로 인한 것으로 판명되었습니다. 내 경우의 자세한 원인은 다음과 같습니다.

    model.transform ()이 호출되면 어휘가 브로드 캐스트되고 모델에서 broadcastDic 속성으로 암시 적으로 저장됩니다. 따라서 model.transform ()을 호출 한 후 CounterVectorModel을 저장하면 전용 var 속성 broadcastDic도 저장됩니다. 그러나 불행하게도 Spark에서 브로드 캐스트 된 객체는 상황에 따라 달라지며 이는 SparkContext에 포함되어 있음을 의미합니다. 해당 CounterVectorModel이 다른 SparkContext에로드되면 이전에 저장된 broadcastDic을 찾지 못합니다.

    따라서 해결책 중 하나는 모델을 저장하기 전에 model.transform () 호출을 방지하거나 model.copy () 메소드로 모델을 복제하는 것입니다.

  2. ==============================

    2.이것을 가로 질러 오는 사람에게는 내가로드 한 모델이 잘못되었다는 것이 밝혀졌습니다. 얀 클라이언트 모드에서 spark-shell을 사용하고 코드를 단계별로 살펴 보았습니다. 모델을로드하려고 시도했지만 메타 데이터 디렉토리를 찾을 수 없다는 오류를 통해 데이터 그램 (model.transform)에 대해 모델을 실행했습니다.

    이것을 가로 질러 오는 사람에게는 내가로드 한 모델이 잘못되었다는 것이 밝혀졌습니다. 얀 클라이언트 모드에서 spark-shell을 사용하고 코드를 단계별로 살펴 보았습니다. 모델을로드하려고 시도했지만 메타 데이터 디렉토리를 찾을 수 없다는 오류를 통해 데이터 그램 (model.transform)에 대해 모델을 실행했습니다.

    나는 되돌아 가서 좋은 모델을 발견하고 그것에 맞서 뛰어 올랐다. 이 코드는 실제로 건전합니다.

  3. from https://stackoverflow.com/questions/37822567/spark-hadoop-failed-to-get-broadcast by cc-by-sa and MIT license