복붙노트

[HADOOP] 문제 : Hbase에서 데이터를 검색하는 Spark 셸의 스칼라 코드

HADOOP

문제 : Hbase에서 데이터를 검색하는 Spark 셸의 스칼라 코드

Spark 셸에서 간단한 Hash 코드를 실행하여 Hbase에서 데이터를 검색하려고합니다. Hadoop 환경은 Kerberos를 지원하므로 kinit를 실행할 수 있습니다.

Spark Shell을 호출하는 단계 :

MASTER=yarn-client

DRIVER_CLASSPATH="/opt/cloudera/parcels/CDH/lib/hbase/lib/*"
DRIVER_LIBRARY_PATH="/opt/cloudera/parcels/CDH/lib/hadoop/lib/native"

spark-shell --driver-class-path "$DRIVER_CLASSPATH" --driver-library-path "$DRIVER_LIBRARY_PATH" --driver-memory 10G --executor-memory 15G --executor-cores 8 --num-executors 3 --master $MASTER

암호:

import org.apache.hadoop.fs._
import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.io._
import org.apache.hadoop.hbase.mapreduce._
import org.apache.hadoop.hbase.util._
import org.apache.spark._

val hc = HBaseConfiguration.create
hc.addResource(new Path("file:///opt/cloudera/parcels/CDH/lib/hbase/conf/hbase-site.xml"))

hc.addResource(new Path("file:///opt/cloudera/parcels/CDH/lib/hbase/conf/core-site.xml"))

hc.set(TableInputFormat.INPUT_TABLE, "poc-customers")
val rdd = sc.newAPIHadoopRDD(hc, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])

rdd.count

다음은 아래 ERROR입니다.

org.apache.hadoop.hbase.client.RetriesExhaustedException: Can't get the location
        at org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:308)
        at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:149)
        at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:57)
        at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200)
        at org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:293)
        at org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:268)
        at org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:140)
        at org.apache.hadoop.hbase.client.ClientScanner.<init>(ClientScanner.java:135)
        at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:888)
        at org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.restart(TableRecordReaderImpl.java:90)
        at org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.initialize(TableRecordReaderImpl.java:167)
        at org.apache.hadoop.hbase.mapreduce.TableRecordReader.initialize(TableRecordReader.java:134)
        at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase$1.initialize(TableInputFormatBase.java:200)
        at org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:133)
        at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:104)
        at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:66)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
        at org.apache.spark.scheduler.Task.run(Task.scala:64)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Could not set up IO Streams to <management-node-server-hostname>/10.118.114.40:60020
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupIOstreams(RpcClientImpl.java:773)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.writeRequest(RpcClientImpl.java:881)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.tracedWriteRequest(RpcClientImpl.java:850)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl.call(RpcClientImpl.java:1184)
        at org.apache.hadoop.hbase.ipc.AbstractRpcClient.callBlockingMethod(AbstractRpcClient.java:216)
        at org.apache.hadoop.hbase.ipc.AbstractRpcClient$BlockingRpcChannelImplementation.callBlockingMethod(AbstractRpcClient.java:300)
        at org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$BlockingStub.get(ClientProtos.java:31865)
        at org.apache.hadoop.hbase.protobuf.ProtobufUtil.getRowOrBefore(ProtobufUtil.java:1580)
        at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegionInMeta(ConnectionManager.java:1294)
        at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1126)
        at org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:299)
        ... 23 more
Caused by: java.lang.RuntimeException: SASL authentication failed. The most likely cause is missing or invalid credentials. Consider 'kinit'.
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$1.run(RpcClientImpl.java:673)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.handleSaslConnectionFailure(RpcClientImpl.java:631)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupIOstreams(RpcClientImpl.java:739)
        ... 33 more
Caused by: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]
        at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:212)
        at org.apache.hadoop.hbase.security.HBaseSaslRpcClient.saslConnect(HBaseSaslRpcClient.java:179)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupSaslConnection(RpcClientImpl.java:605)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.access$600(RpcClientImpl.java:154)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$2.run(RpcClientImpl.java:731)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$2.run(RpcClientImpl.java:728)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupIOstreams(RpcClientImpl.java:728)
        ... 33 more
Caused by: GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)
        at sun.security.jgss.krb5.Krb5InitCredential.getInstance(Krb5InitCredential.java:147)
        at sun.security.jgss.krb5.Krb5MechFactory.getCredentialElement(Krb5MechFactory.java:121)
        at sun.security.jgss.krb5.Krb5MechFactory.getMechanismContext(Krb5MechFactory.java:187)
        at sun.security.jgss.GSSManagerImpl.getMechanismContext(GSSManagerImpl.java:223)
        at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:212)
        at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:179)
        at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:193)
        ... 42 more

참고 사항 :

이 문제를 해결하는 데 도움을 얻으 려합니다.

해결법

  1. ==============================

    1.Spark "드라이버"가 YARN에 클러스터의 어딘가에 "실행자"를 생성하도록 요청하면 kinit로 만든 로컬 Kerberos TGT를 사용하여 인증합니다. 그런 다음 YARN은 HDFS 및 YARN에 액세스하기 위해 모든 집행자가 공유하는 글로벌 위임 토큰을 발급합니다.

    Spark "드라이버"가 YARN에 클러스터의 어딘가에 "실행자"를 생성하도록 요청하면 kinit로 만든 로컬 Kerberos TGT를 사용하여 인증합니다. 그런 다음 YARN은 HDFS 및 YARN에 액세스하기 위해 모든 집행자가 공유하는 글로벌 위임 토큰을 발급합니다.

    아아아, HBase는 위임 토큰을 지원하지 않습니다. 각 실행자는 로컬 TGT를 사용하여 ZK로, 그런 다음 실제 HBase RegionServer로 다시 인증해야합니다.

    완벽한 세계에서는 "spark-default.conf"에 spark.yarn.principal과 spark.yarn.keytab이라는 두 개의 속성을 삽입해야합니다 ( "ktutil"유틸리티를 사용하여 암호를 저장하는 키탭을 만드는 것이 좋습니다) )

    아아,이 기능은 HBase 초기 인증이 아닌 HDFS 위임 토큰을 갱신해야하는 장기 실행 스트리밍 작업 (일반적으로 7 일마다)을 위해 제작되었습니다. 이제, Spark 1.6 릴리즈 노트는 YARN 및 Kerberos와 관련된 많은 버그 수정을 보여주었습니다.이 기능은 이제 HBase에도 기본적으로 적용됩니다. 그러나 나는 그것에 내기하지 않을 것이다.

    그래서 해결 방법은 무엇입니까?

    그런 식으로 사용하면 UGI는 TGT를 비공개로 유지합니다. 캐시에 표시되지 않으므로 동일한 시스템의 다른 프로세스가 재사용 할 수 없습니다. 다른 프로세스의 kinit은이를 변경하지 않습니다. .

  2. ==============================

    2.문제의 근본 원인은 다음과 같습니다.

    문제의 근본 원인은 다음과 같습니다.

    GSSException : 유효한 자격 증명이 제공되지 않는다 (기구 레벨 : Kerberos tgt를 찾지 못했습니다)

    Cloudera Troubleshooting Guide에서이 문제에 대한 해결책 제시

    제안 된 해결책을 시도해 볼 수 있습니다.

  3. ==============================

    3.나는 OP와 같은 프로젝트에서 일하고있다. 우리는 Samson Scharfrichter의 답변을 직접 사용하지 않았지만 이런 종류의 해결책이 가능하다는 확신을주었습니다. 여기 우리를 위해 일한 것이 있습니다 :

    나는 OP와 같은 프로젝트에서 일하고있다. 우리는 Samson Scharfrichter의 답변을 직접 사용하지 않았지만 이런 종류의 해결책이 가능하다는 확신을주었습니다. 여기 우리를 위해 일한 것이 있습니다 :

    우리는 현재 SparkOnHBase (https://github.com/cloudera-labs/SparkOnHBase)의 RDD를 사용하고 있습니다. 그러나 https://github.com/cloudera-labs/SparkOnHBase/pull/7에서 제안 된 변경 사항을 통합했습니다. 이 끌어 오기 요청이 열려 있기 때문에 하위 클래스로 변경 내용을 구현할 수도 있습니다.

    import com.cloudera.spark.hbase.{HBaseContext, HBaseScanRDD}
    import org.apache.hadoop.conf.Configuration
    import org.apache.hadoop.hbase.HBaseConfiguration
    import org.apache.hadoop.hbase.client.Scan
    import org.apache.hadoop.security.UserGroupInformation
    import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod
    import org.apache.spark.{SerializableWritable, SparkContext}
    import org.apache.spark.broadcast.Broadcast
    import org.apache.spark.deploy.SparkHadoopUtil
    import org.apache.spark.rdd.RDD
    
    class MyHBaseScanRDD (sc: SparkContext,
        @transient tableName: String,
        @transient scan: Scan,
        configBroadcast: Broadcast[SerializableWritable[Configuration]]) extends HBaseScanRDD(sc, tableName, scan, configBroadcast) {
      val jobCredentialBroadcast = sc.broadcast(new SerializableWritable(jobTransient.getCredentials))
    
      override def addCreds {
        val creds = SparkHadoopUtil.get.getCurrentUserCredentials
        @transient val ugi = UserGroupInformation.getCurrentUser
        ugi.addCredentials(creds)
        ugi.setAuthenticationMethod(AuthenticationMethod.PROXY)
        ugi.addCredentials(jobCredentialBroadcast.value.value)
      }
    }
    
    class MyHBaseContext (sc: SparkContext,
        @transient config: Configuration,
        val tmpHdfsConfigFile: String = null) extends HBaseContext(sc, config, tmpHdfsConfigFile) {
      def myHBaseScanRDD(tableName: String, scan: Scan): RDD[(Array[Byte], java.util.List[(Array[Byte], Array[Byte], Array[Byte])])] = {
        new MyHBaseScanRDD(sc, tableName, scan, broadcastedConf)
      }
    }
    
    val hc = HBaseConfiguration.create
    val scan = new Scan
    val hbaseContext = new MyHBaseContext(sc, hc)
    val rdd = hbaseContext.myHBaseScanRDD("tableName", scan)
    rdd.count
    

    이러한 변경 사항이 SpaseOnHBase의 후속 제품인 HBase의 HBase-Spark 모듈에 통합 된 것처럼 보입니다. 버전 관리 문제로 인해 새로운 HBase 라이브러리를 사용할 수 없지만이 문제에 직면 한 사람이라면 먼저 시도해보십시오.

  4. from https://stackoverflow.com/questions/35332026/issue-scala-code-in-spark-shell-to-retrieve-data-from-hbase by cc-by-sa and MIT license