[HADOOP] Coprocesor HBase를 사용하여 보조 색인 만들기
HADOOPCoprocesor HBase를 사용하여 보조 색인 만들기
prePut 후크를 사용하여 보조 인덱스를 만드는 자체 코 프로세서를 작성하려고했습니다. 시작하려면, 나는 단순히 prePut 코 프로세서를 작동 시키려고 노력했습니다. 지금까지 나는 코 프로세서가 전달 된 put 객체에 추가되도록 할 수 있습니다. 내가 찾은 것은 put 객체가 전달한 것과는 별도의 행에 쓸 코 프로세서를 얻을 수 없다는 것입니다. 분명히 보조 인덱스를 생성하려면이 인덱스를 찾아야합니다.
아래는 내 보조 프로세서 용 코드이지만 작동하지 않습니다. 예, 모든 테이블이 존재하며 'colfam1'도 존재합니다. HBase 버전 : Cloudera의 CDH4에서 HBase 0.92.1-cdh4.1.2
문제가 무엇인지 아는 사람 있습니까?
@Override
public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e,final Put put, final WALEdit edit, final boolean writeToWAL) throws IOException {
KeyValue kv = new KeyValue(Bytes.toBytes("COPROCESSORROW"), Bytes.toBytes("colfam1"),Bytes.toBytes("COPROCESSOR: "+System.currentTimeMillis()),Bytes.toBytes("IT WORKED"));
put.add(kv);
}
다음 오류가 발생합니다.
ERROR: org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 1 action: IOException: 1 time, servers with issues:
최신 정보:
보조 프로세서를 다음과 같이 수정했지만 여전히 오류가 발생합니다. 이제 포스트 풋 (2 차 인덱스)이 작성되었지만 여전히 시간 초과 오류가 있습니다. 이 지역의 전체 테이블이 너무 추락하여이 지역을 다시 시작해야합니다. 때로는 영역 다시 시작이 작동하지 않고 전체 영역 (모든 테이블) 서버가 재구성되어야합니다.
나는 왜 ...?
@Override
public void start(CoprocessorEnvironment env) throws IOException {
LOG.info("(start)");
pool = new HTablePool(env.getConfiguration(), 10);
}
@Override
public void postPut(final ObserverContext<RegionCoprocessorEnvironment> observerContext,final Put put,final WALEdit edit,final boolean writeToWAL) throws IOException {
byte[] tableName = observerContext.getEnvironment().getRegion().getRegionInfo().getTableName();
//not necessary though if you register the coprocessor for the specific table , SOURCE_TBL
if (!Bytes.equals(tableName, Bytes.toBytes(SOURCE_TABLE)))
return;
try {
LOG.info("STARTING postPut");
HTableInterface table = pool.getTable(Bytes.toBytes(INDEX_TABLE));
LOG.info("TURN OFF AUTOFLUSH");
table.setAutoFlush(false);
//create row
LOG.info("Creating new row");
byte [] rowkey = Bytes.toBytes("COPROCESSOR ROW");
Put indexput = new Put(rowkey);
indexput.add(Bytes.toBytes ( "data"), Bytes.toBytes("CP: "+System.currentTimeMillis()), Bytes.toBytes("IT WORKED!"));
LOG.info("Writing to table");
table.put(indexput);
LOG.info("flushing commits");
table.flushCommits();
LOG.info("close table");
table.close();
} catch ( IllegalArgumentException ex) {
//handle excepion.
}
}
@Override
public void stop(CoprocessorEnvironment env) throws IOException {
LOG.info("(stop)");
pool.close();
}
지역 서버 로그는 다음과 같습니다 (내 기록 주석에 유의하십시오).
2013-01-30 19:30:39,754 INFO my.package.MyCoprocessor: STARTING postPut
2013-01-30 19:30:39,754 INFO my.package.MyCoprocessor: TURN OFF AUTOFLUSH
2013-01-30 19:30:39,755 INFO my.package.MyCoprocessor: Creating new row
2013-01-30 19:30:39,755 INFO my.package.MyCoprocessor: Writing to table
2013-01-30 19:30:39,755 INFO my.package.MyCoprocessor: flushing commits
2013-01-30 19:31:39,813 WARN org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation: Failed all from region=test_table,,1359573731255.d41b77b31fafa6502a8f09db9c56b9d8., hostname=node01, port=60020
java.util.concurrent.ExecutionException: java.net.SocketTimeoutException: Call to node01/<private_ip>:60020 failed on socket timeout exception: java.net.SocketTimeoutException: 60000 millis timeout while waiting for channel to be ready for read. ch : java.nio.channels.SocketChannel[connected local=/<private_ip>:56390 remote=node01/<private_ip>:60020]
at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222)
at java.util.concurrent.FutureTask.get(FutureTask.java:83)
at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.processBatchCallback(HConnectionManager.java:1557)
at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.processBatch(HConnectionManager.java:1409)
at org.apache.hadoop.hbase.client.HTable.flushCommits(HTable.java:949)
at org.apache.hadoop.hbase.client.HTablePool$PooledHTable.flushCommits(HTablePool.java:449)
at my.package.MyCoprocessor.postPut(MyCoprocessor.java:81)
at org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.postPut(RegionCoprocessorHost.java:682)
at org.apache.hadoop.hbase.regionserver.HRegion.doMiniBatchPut(HRegion.java:1901)
at org.apache.hadoop.hbase.regionserver.HRegion.put(HRegion.java:1742)
at org.apache.hadoop.hbase.regionserver.HRegionServer.multi(HRegionServer.java:3102)
at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.hadoop.hbase.ipc.WritableRpcEngine$Server.call(WritableRpcEngine.java:364)
at org.apache.hadoop.hbase.ipc.HBaseServer$Handler.run(HBaseServer.java:1345)
해결 방법 : 보조 프로세서의 동일한 테이블에 보조 프로세서를 쓰려고했습니다. 간단히 말해서 셀을 작성하면 CP가 셀을 작성하여 CP를 다시 트리거하고 다른 셀을 계속 작성하는 것입니다. 나는이 루프를 막기 위해 CP 행을 기록하는 행 점검 b4를함으로써 그것을 중단시켰다.
해결법
-
==============================
1.아래는 Hbase에서 Coprocessors를 사용하여 보조 색인을 만드는 방법에 대한 코드 스 니펫입니다. 도움이 될 수 있습니다.
아래는 Hbase에서 Coprocessors를 사용하여 보조 색인을 만드는 방법에 대한 코드 스 니펫입니다. 도움이 될 수 있습니다.
public class TestCoprocessor extends BaseRegionObserver{ private HTablePool pool = null; private final static String INDEX_TABLE = "INDEX_TBL"; private final static String SOURCE_TABLE = "SOURCE_TBL"; @Override public void start(CoprocessorEnvironment env) throws IOException { pool = new HTablePool(env.getConfiguration(), 10); } @Override public void postPut( final ObserverContext<RegionCoprocessorEnvironment> observerContext, final Put put, final WALEdit edit, final boolean writeToWAL) throws IOException { byte[] table = observerContext.getEnvironment( ).getRegion().getRegionInfo().getTableName(); // Not necessary though if you register the coprocessor // for the specific table, SOURCE_TBL if (!Bytes.equals(table, Bytes.toBytes(SOURCE_TABLE))) { return; } try { final List<KeyValue> filteredList = put.get( Bytes.toBytes ( "colfam1"), Bytes.toBytes(" qaul")); filteredList.get( 0 ); //get the column value // get the values HTableInterface table = pool.getTable(Bytes.toBytes(INDEX_TABLE)); // create row key byte [] rowkey = mkRowKey () //make the row key Put indexput = new Put(rowkey); indexput.add( Bytes.toBytes( "colfam1"), Bytes.toBytes(" qaul"), Bytes.toBytes(" value..")); table.put(indexput); table.close(); } catch ( IllegalArgumentException ex) { // handle excepion. } } @Override public void stop(CoprocessorEnvironment env) throws IOException { pool.close(); } }
SOURCE_BL에 위 코 프로세서를 등록하려면 hbase 쉘로 이동하여 아래 단계를 따르십시오
-
==============================
2.HBase에 보조 인덱스가 내장되었습니다. 같은 블로그 항목을보십시오. HBase에서 CoProcessor를 사용할 필요가 없습니다.
HBase에 보조 인덱스가 내장되었습니다. 같은 블로그 항목을보십시오. HBase에서 CoProcessor를 사용할 필요가 없습니다.
from https://stackoverflow.com/questions/14540167/create-secondary-index-using-coprocesor-hbase by cc-by-sa and MIT license
'HADOOP' 카테고리의 다른 글
[HADOOP] 하이브에 사용 가능한 데이터 시각화 도구 (0) | 2019.07.23 |
---|---|
[HADOOP] 출력 쓰기시 Hadoop지도 축소 작업이 실패합니다. (0) | 2019.07.23 |
[HADOOP] Hadoop - 감속기는 어떻게 데이터를 가져 옵니까? (0) | 2019.07.23 |
[HADOOP] HBase가 분산 된 환경에서 실행되지 않는다면 의미가 있습니까? (0) | 2019.07.23 |
[HADOOP] Hadoop 파티션 (0) | 2019.07.23 |