복붙노트

[HADOOP] HBase를에서 읽을 때 FLINK 직렬화 오류를 thowing

HADOOP

HBase를에서 읽을 때 FLINK 직렬화 오류를 thowing

나는 직렬화 오류가 발생하고 맵 내에서 richfatMapFunction를 사용하여 HBase를 읽을 때. 내가 뭘하려고 데이터 스트림은 HBase를 읽고 특정 문자열로 동일한 경우 다른 무시합니다. 아래는 내가 무엇입니까 샘플 프로그램 오류입니다.

package com.abb.Flinktest
import java.text.SimpleDateFormat
import java.util.Properties

import scala.collection.concurrent.TrieMap 
import org.apache.flink.addons.hbase.TableInputFormat
import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.io.OutputFormat
import org.apache.flink.api.java.tuple.Tuple2
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.createTypeInformation
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.util.Collector
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.ConnectionFactory
import org.apache.hadoop.hbase.client.HTable
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.filter.BinaryComparator
import org.apache.hadoop.hbase.filter.CompareFilter
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter
import org.apache.hadoop.hbase.util.Bytes
import org.apache.log4j.Level
import org.apache.flink.api.common.functions.RichMapFunction

object Flinktesthbaseread {

  def main(args:Array[String])
  {
   val env = StreamExecutionEnvironment.createLocalEnvironment()
   val kafkaStream = env.fromElements("hello")
   val c=kafkaStream.map(x => if(x.equals("hello"))kafkaStream.flatMap(new ReadHbase()) )       
   env.execute()
  }
      class ReadHbase extends RichFlatMapFunction[String,Tuple11[String,String,String,String,String,String,String,String,String,String,String]] with Serializable
    {
        var conf: org.apache.hadoop.conf.Configuration = null;
    var table: org.apache.hadoop.hbase.client.HTable = null;
    var hbaseconnection:org.apache.hadoop.hbase.client.Connection =null
    var taskNumber: String = null;
    var rowNumber = 0;
    val serialVersionUID = 1L;

    override def open(parameters: org.apache.flink.configuration.Configuration) {
      println("getting table")
       conf = HBaseConfiguration.create()
      val in = getClass().getResourceAsStream("/hbase-site.xml")

      conf.addResource(in)
      hbaseconnection = ConnectionFactory.createConnection(conf)
      table = new HTable(conf, "testtable");
     // this.taskNumber = String.valueOf(taskNumber);
    }

     override def flatMap(msg:String,out:Collector[Tuple11[String,String,String,String,String,String,String,String,String,String,String]]) 
      {
                //flatmap operation here
      }

      override def close() {

      table.flushCommits();
      table.close();
    }

    }
}

오류:

log4j:WARN No appenders could be found for logger (org.apache.flink.api.scala.ClosureCleaner$).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Task not serializable
    at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:172)
    at org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:164)
    at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:617)
    at org.apache.flink.streaming.api.scala.DataStream.clean(DataStream.scala:959)
    at org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:484)
    at com.abb.Flinktest.Flinktesthbaseread$.main(Flinktesthbaseread.scala:45)
    at com.abb.Flinktest.Flinktesthbaseread.main(Flinktesthbaseread.scala)
Caused by: java.io.NotSerializableException: org.apache.flink.streaming.api.scala.DataStream
    - field (class "com.abb.Flinktest.Flinktesthbaseread$$anonfun$1", name: "kafkaStream$1", type: "class org.apache.flink.streaming.api.scala.DataStream")
    - root object (class "com.abb.Flinktest.Flinktesthbaseread$$anonfun$1", <function1>)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1182)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301)
    at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:170)
    ... 6 more

나는 방법 및 아와 같은 클래스가 직렬화함으로써 클래스,하지만 행운 내부의 필드를 포장했습니다. 사람이에 대한 몇 가지 빛을 던지거나이에 대한 몇 가지 해결 방법을 제안 할 수 없습니다.

해결법

  1. ==============================

    1.문제는 당신이 단순히 직렬화 할 수없는지도 기능의 카프카 스트림 변수에 액세스하려고하는 것입니다. 그것은 데이터의 단지 추상적 인 표현입니다. 그것은 처음에 함수를 무효화 아무것도 포함하지 않습니다.

    문제는 당신이 단순히 직렬화 할 수없는지도 기능의 카프카 스트림 변수에 액세스하려고하는 것입니다. 그것은 데이터의 단지 추상적 인 표현입니다. 그것은 처음에 함수를 무효화 아무것도 포함하지 않습니다.

    대신 같은 것을 할 :

       kafkaStream.filter(x => x.equals("hello")).flatMap(new ReadHBase())
    

    필터 기능은 조건이 true 인 요소를 유지할 것이며, 사람들은 당신의 flatMap 함수에 전달됩니다.

    당신이 변환을 지정할 때 실제로 어떻게되는지에 관해서는 약간의 오해가있을 나타나는 내가보기 엔, 기초 API 개념 설명서를 읽어보실 것을 추천 할 것입니다.

  2. from https://stackoverflow.com/questions/40446939/flink-thowing-serialization-error-when-reading-from-hbase by cc-by-sa and MIT license