복붙노트

[HADOOP] Coprocesor HBase를 사용하여 보조 색인 만들기

HADOOP

Coprocesor HBase를 사용하여 보조 색인 만들기

prePut 후크를 사용하여 보조 인덱스를 만드는 자체 코 프로세서를 작성하려고했습니다. 시작하려면, 나는 단순히 prePut 코 프로세서를 작동 시키려고 노력했습니다. 지금까지 나는 코 프로세서가 전달 된 put 객체에 추가되도록 할 수 있습니다. 내가 찾은 것은 put 객체가 전달한 것과는 별도의 행에 쓸 코 프로세서를 얻을 수 없다는 것입니다. 분명히 보조 인덱스를 생성하려면이 인덱스를 찾아야합니다.

아래는 내 보조 프로세서 용 코드이지만 작동하지 않습니다. 예, 모든 테이블이 존재하며 'colfam1'도 존재합니다. HBase 버전 : Cloudera의 CDH4에서 HBase 0.92.1-cdh4.1.2

문제가 무엇인지 아는 사람 있습니까?

    @Override
        public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e,final Put put, final WALEdit edit, final boolean writeToWAL) throws IOException {          
            KeyValue kv = new KeyValue(Bytes.toBytes("COPROCESSORROW"), Bytes.toBytes("colfam1"),Bytes.toBytes("COPROCESSOR: "+System.currentTimeMillis()),Bytes.toBytes("IT WORKED"));
            put.add(kv);
        }

다음 오류가 발생합니다.

    ERROR: org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 1 action: IOException: 1 time, servers with issues:

최신 정보:

보조 프로세서를 다음과 같이 수정했지만 여전히 오류가 발생합니다. 이제 포스트 풋 (2 차 인덱스)이 작성되었지만 여전히 시간 초과 오류가 있습니다. 이 지역의 전체 테이블이 너무 추락하여이 지역을 다시 시작해야합니다. 때로는 영역 다시 시작이 작동하지 않고 전체 영역 (모든 테이블)  서버가 재구성되어야합니다.

나는 왜 ...?

@Override
      public void start(CoprocessorEnvironment env) throws IOException {        
        LOG.info("(start)");
        pool = new HTablePool(env.getConfiguration(), 10);
     }

    @Override
    public void postPut(final ObserverContext<RegionCoprocessorEnvironment> observerContext,final Put put,final WALEdit edit,final boolean writeToWAL) throws IOException {
        byte[] tableName  = observerContext.getEnvironment().getRegion().getRegionInfo().getTableName();

        //not necessary though if you register the coprocessor for the specific table , SOURCE_TBL
        if (!Bytes.equals(tableName, Bytes.toBytes(SOURCE_TABLE))) 
            return;         

        try {           
            LOG.info("STARTING postPut");
            HTableInterface table = pool.getTable(Bytes.toBytes(INDEX_TABLE));
            LOG.info("TURN OFF AUTOFLUSH");
            table.setAutoFlush(false);
            //create row              
            LOG.info("Creating new row");            
            byte [] rowkey = Bytes.toBytes("COPROCESSOR ROW");
            Put indexput  = new Put(rowkey); 
            indexput.add(Bytes.toBytes ( "data"),  Bytes.toBytes("CP: "+System.currentTimeMillis()),  Bytes.toBytes("IT WORKED!"));
            LOG.info("Writing to table");
            table.put(indexput);
            LOG.info("flushing commits");            
            table.flushCommits();
            LOG.info("close table");
            table.close();

        } catch ( IllegalArgumentException ex) {

            //handle excepion.
        }

      }


      @Override
      public void stop(CoprocessorEnvironment env) throws IOException {
        LOG.info("(stop)");
        pool.close();
      }

지역 서버 로그는 다음과 같습니다 (내 기록 주석에 유의하십시오).

2013-01-30 19:30:39,754 INFO my.package.MyCoprocessor: STARTING postPut
2013-01-30 19:30:39,754 INFO my.package.MyCoprocessor: TURN OFF AUTOFLUSH
2013-01-30 19:30:39,755 INFO my.package.MyCoprocessor: Creating new row
2013-01-30 19:30:39,755 INFO my.package.MyCoprocessor: Writing to table
2013-01-30 19:30:39,755 INFO my.package.MyCoprocessor: flushing commits
2013-01-30 19:31:39,813 WARN org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation: Failed all from region=test_table,,1359573731255.d41b77b31fafa6502a8f09db9c56b9d8., hostname=node01, port=60020
java.util.concurrent.ExecutionException: java.net.SocketTimeoutException: Call to node01/<private_ip>:60020 failed on socket timeout exception: java.net.SocketTimeoutException: 60000 millis timeout while waiting for channel to be ready for read. ch : java.nio.channels.SocketChannel[connected local=/<private_ip>:56390 remote=node01/<private_ip>:60020]
    at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222)
    at java.util.concurrent.FutureTask.get(FutureTask.java:83)
    at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.processBatchCallback(HConnectionManager.java:1557)
    at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.processBatch(HConnectionManager.java:1409)
    at org.apache.hadoop.hbase.client.HTable.flushCommits(HTable.java:949)
    at org.apache.hadoop.hbase.client.HTablePool$PooledHTable.flushCommits(HTablePool.java:449)
    at my.package.MyCoprocessor.postPut(MyCoprocessor.java:81)
    at org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.postPut(RegionCoprocessorHost.java:682)
    at org.apache.hadoop.hbase.regionserver.HRegion.doMiniBatchPut(HRegion.java:1901)
    at org.apache.hadoop.hbase.regionserver.HRegion.put(HRegion.java:1742)
    at org.apache.hadoop.hbase.regionserver.HRegionServer.multi(HRegionServer.java:3102)
    at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.apache.hadoop.hbase.ipc.WritableRpcEngine$Server.call(WritableRpcEngine.java:364)
    at org.apache.hadoop.hbase.ipc.HBaseServer$Handler.run(HBaseServer.java:1345)

해결 방법 : 보조 프로세서의 동일한 테이블에 보조 프로세서를 쓰려고했습니다. 간단히 말해서 셀을 작성하면 CP가 셀을 작성하여 CP를 다시 트리거하고 다른 셀을 계속 작성하는 것입니다. 나는이 루프를 막기 위해 CP 행을 기록하는 행 점검 b4를함으로써 그것을 중단시켰다.

해결법

  1. ==============================

    1.아래는 Hbase에서 Coprocessors를 사용하여 보조 색인을 만드는 방법에 대한 코드 스 니펫입니다. 도움이 될 수 있습니다.

    아래는 Hbase에서 Coprocessors를 사용하여 보조 색인을 만드는 방법에 대한 코드 스 니펫입니다. 도움이 될 수 있습니다.

    public class TestCoprocessor extends BaseRegionObserver{
    
        private HTablePool pool = null;
    
        private final static String  INDEX_TABLE  = "INDEX_TBL";
        private final static String  SOURCE_TABLE = "SOURCE_TBL";
    
        @Override
        public void start(CoprocessorEnvironment env) throws IOException {  
            pool = new HTablePool(env.getConfiguration(), 10);
        }
    
        @Override
        public void postPut(
            final ObserverContext<RegionCoprocessorEnvironment> observerContext,
            final Put put,
            final WALEdit edit,
            final boolean writeToWAL)
            throws IOException {
    
            byte[] table = observerContext.getEnvironment(
                ).getRegion().getRegionInfo().getTableName();
    
            // Not necessary though if you register the coprocessor
            // for the specific table, SOURCE_TBL
            if (!Bytes.equals(table, Bytes.toBytes(SOURCE_TABLE))) {
                return; 
            }
    
            try {
                final List<KeyValue> filteredList = put.get(
                    Bytes.toBytes ( "colfam1"), Bytes.toBytes(" qaul"));
                filteredList.get( 0 ); //get the column value
    
                // get the values 
                HTableInterface table = pool.getTable(Bytes.toBytes(INDEX_TABLE));
    
                // create row key             
                byte [] rowkey = mkRowKey () //make the row key
                Put indexput = new Put(rowkey); 
                indexput.add(
                    Bytes.toBytes( "colfam1"),
                    Bytes.toBytes(" qaul"),
                    Bytes.toBytes(" value.."));
    
                table.put(indexput);
                table.close();
    
            } catch ( IllegalArgumentException ex) {
                // handle excepion.
            }
    
        }
    
    
        @Override
        public void stop(CoprocessorEnvironment env) throws IOException {
            pool.close();
        }
    
    }
    

    SOURCE_BL에 위 코 프로세서를 등록하려면 hbase 쉘로 이동하여 아래 단계를 따르십시오

  2. ==============================

    2.HBase에 보조 인덱스가 내장되었습니다. 같은 블로그 항목을보십시오. HBase에서 CoProcessor를 사용할 필요가 없습니다.

    HBase에 보조 인덱스가 내장되었습니다. 같은 블로그 항목을보십시오. HBase에서 CoProcessor를 사용할 필요가 없습니다.

  3. from https://stackoverflow.com/questions/14540167/create-secondary-index-using-coprocesor-hbase by cc-by-sa and MIT license