[HADOOP] HBase 맵 전용 행 삭제
HADOOPHBase 맵 전용 행 삭제
HBase mapreduce를 처음 작성했을 때 HBase에서 행을 삭제하는 데 문제가 있습니다 (맵 전용 작업으로 실행하려고 시도 함). 작업이 성공하고 HBase 테이블을 스캔 할 수 있으며 매퍼에서 올바른 행 키를 HBase (sysout을 통해 확인)에서 읽습니다. 그러나 Delete del = new Delete (row.get ()) 호출이 실제로 아무것도하지 않는 것 같습니다.
아래는 내가 실행하려고하는 코드입니다.
HBaseDelete.java
public class HBaseDelete {
public static void main(String[] args) throws Exception {
Configuration config = HBaseConfiguration.create();
Job job = new Job(config, "log_table");
job.setJarByClass(HBaseDeleteMapper.class);
Scan scan = new Scan();
scan.setCaching(500);
scan.setCacheBlocks(false);
TableMapReduceUtil.initTableMapperJob("log_table", scan, HBaseDeleteMapper.class, null, null, job);
job.setOutputFormatClass(NullOutputFormat.class);
job.setNumReduceTasks(0);
boolean b = job.waitForCompletion(true);
if (!b) {
throw new IOException("error with job!");
}
}
}
HBaseDeleteMapper.java
public class HBaseDeleteMapper extends TableMapper<ImmutableBytesWritable, Delete>{
@Override
public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
Delete delete = new Delete(row.get());
context.write(row, delete);
}
}
삭제를 '커밋'하기 위해 누락 된 것이 있습니까?
해결법
-
==============================
1.테이블이 아닌 컨텍스트에 쓰는 경우 매퍼가 다음과 비슷하게 보입니다.
테이블이 아닌 컨텍스트에 쓰는 경우 매퍼가 다음과 비슷하게 보입니다.
public class HBaseDeleteMapper extends TableMapper<ImmutableBytesWritable, NullWritable>{ private HTable myTable; protected void setup(Context context) throws IOException, InterruptedException { /* HTable instance for deletes */ myTable = new HTable(HBaseConfiguration.create(), "myTable".getBytes()); } public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException { myTable.delete(new Delete(row.get())); /* Delete the row from the table */ //context.write(row, NullWritable.get()); /* Just an output with deleted rows if you need it for something (avoid it if you not) */ } protected void cleanup(Context context) throws IOException, InterruptedException { myTable.close(); /* Close table */ } }
삭제 작업은 쓰기 버퍼를 사용하지 않습니다.이 코드는 삭제 당 1 개의 RPC 작업을 실행하므로 이러한 유형의 작업에는 적합하지 않습니다. 이를 해결하기 위해 고유 한 List
를 빌드하여 배치 할 수 있습니다. public class HBaseDeleteMapper extends TableMapper<NullWritable, NullWritable>{ private HTable myTable; private List<Delete> deleteList = new ArrayList<Delete>(); final private int buffer = 10000; /* Buffer size, tune it as desired */ protected void setup(Context context) throws IOException, InterruptedException { /* HTable instance for deletes */ myTable = new HTable(HBaseConfiguration.create(), "myTable".getBytes()); } public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException { deleteList.add(new Delete(row.get())); /* Add delete to the batch */ if (deleteList.size()==buffer) { myTable.delete(deleteList); /* Submit batch */ deleteList.clear(); /* Clear batch */ } } protected void cleanup(Context context) throws IOException, InterruptedException { if (deleteList.size()>0) { myTable.delete(deleteList); /* Submit remaining batch */ } myTable.close(); /* Close table */ } }
-
==============================
2.아래 코드는 공통 문자열을 포함하는 행 키를 hbase 테이블에서 스캔하고 목록의 크기가 1000보다 크면 삭제합니다 (목록이 공간 / 힙 공간을 벗어나지 않도록하는 것입니다). 당신의 hdfs에.
아래 코드는 공통 문자열을 포함하는 행 키를 hbase 테이블에서 스캔하고 목록의 크기가 1000보다 크면 삭제합니다 (목록이 공간 / 힙 공간을 벗어나지 않도록하는 것입니다). 당신의 hdfs에.
운전사
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.oclc.wcsync.hadoop.mapper.HbaseBulkDeleteMapper; import org.oclc.wcsync.hadoop.util.JobName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Properties; /** doc. */ public class HbaseBulkDelete extends Configured implements Tool{ /** doc. */ private static final Logger LOG = LoggerFactory.getLogger(HbaseBulkDelete.class); /** * doc. * @param args ... * @throws Exception ... */ public static void main(String[] args) throws Exception { int res = ToolRunner.run(HBaseConfiguration.create(), new HbaseBulkDelete(), args); System.exit(res); } @Override public int run(String[] strings) throws Exception { JobName jobName = JobName.HBASE_DELETE; LOG.info ("Got into class driver"); Configuration conf = HBaseConfiguration.create (); String env = "prod"; Properties hadoopProps = new Properties(); hadoopProps.load(HbaseBulkDelete.class.getResourceAsStream("/hadoop.config." + env + ".properties")); conf.set("jobName", jobName.name()); conf.set ("hbase.master.catalog.timeout","600000"); conf.set ("hbase.client.scanner.timeout.period","600000"); conf.set ("hbase.rpc.timeout","6000000"); conf.set ("mapred.task.timeout","6000000"); conf.set("mapreduce.map.memory.mb","4096"); Job job = new Job(conf); job.setJobName(jobName.format("HbaseBulkDelete")); job.setJarByClass(HbaseBulkDelete.class); Scan s = new Scan (); s.addFamily(Bytes.toBytes("data")); s.setStartRow (Bytes.toBytes ("Your_Substring")); TableMapReduceUtil.initTableMapperJob ("Ingest", s, HbaseBulkDeleteMapper.class, TextOutputFormat.class, TextOutputFormat.class, job); job.setNumReduceTasks(0); job.setOutputFormatClass(TextOutputFormat.class); FileOutputFormat.setOutputPath(job, new Path("/user/neethu/HbaseBulkDelete")); return job.waitForCompletion(true) ? 0 : -1; } }
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Text; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; import java.util.List; public class HbaseBulkDeleteMapper extends TableMapper<Text, Text> { private static final Logger LOG = LoggerFactory.getLogger(HbaseBulkDeleteMapper.class); Configuration conf; @Override protected void setup(Context context) throws IOException, InterruptedException { conf = context.getConfiguration(); } List<Delete> listOfBatchDelete = new ArrayList<Delete> (); @Override public void map(ImmutableBytesWritable row, Result values, Context context) throws IOException, InterruptedException { HTable table= new HTable(conf,"Ingest"); if (listOfBatchDelete != null && !listOfBatchDelete.isEmpty () && listOfBatchDelete.size () > 1000) { LOG.info ("Deleted records!"); listOfBatchDelete.clear (); } String KEY=Bytes.toString(values.getRow ()); try { if (KEY.contains ("Your_substring") ){ LOG.info ("RowKey:"+KEY ); Delete d=new Delete(Bytes.toBytes(KEY)); listOfBatchDelete.add(d); context.write (new Text ("RowKey"), new Text (KEY)); } } catch (Exception e) { LOG.error ("error ---" + e); } // table.delete(listOfBatchDelete); } }
from https://stackoverflow.com/questions/21291236/hbase-map-only-row-delete by cc-by-sa and MIT license
'HADOOP' 카테고리의 다른 글
[HADOOP] 다른 사용자로 맵 축소 작업을 실행 (0) | 2019.08.07 |
---|---|
[HADOOP] 하이브 테이블 생성 쿼리의 MismatchedTokenException (0) | 2019.08.07 |
[HADOOP] hadoop 스트리밍 : EMR에서 모듈 가져 오기 (0) | 2019.08.07 |
[HADOOP] 마스터 노드의“start-all.sh”및“start-dfs.sh”가 슬레이브 노드 서비스를 시작하지 않습니까? (0) | 2019.08.07 |
[HADOOP] 원사 클러스터에서 Spark 작업 제출 (0) | 2019.08.07 |