복붙노트

[HADOOP] HBase 맵 전용 행 삭제

HADOOP

HBase 맵 전용 행 삭제

HBase mapreduce를 처음 작성했을 때 HBase에서 행을 삭제하는 데 문제가 있습니다 (맵 전용 작업으로 실행하려고 시도 함). 작업이 성공하고 HBase 테이블을 스캔 할 수 있으며 매퍼에서 올바른 행 키를 HBase (sysout을 통해 확인)에서 읽습니다. 그러나 Delete del = new Delete (row.get ()) 호출이 실제로 아무것도하지 않는 것 같습니다.

아래는 내가 실행하려고하는 코드입니다.

HBaseDelete.java

public class HBaseDelete { 
  public static void main(String[] args) throws Exception {

    Configuration config = HBaseConfiguration.create();
    Job job = new Job(config, "log_table");
    job.setJarByClass(HBaseDeleteMapper.class);     

    Scan scan = new Scan();
    scan.setCaching(500);        
    scan.setCacheBlocks(false);

    TableMapReduceUtil.initTableMapperJob("log_table", scan, HBaseDeleteMapper.class, null, null, job);

    job.setOutputFormatClass(NullOutputFormat.class);
    job.setNumReduceTasks(0);

    boolean b = job.waitForCompletion(true);
    if (!b) {
        throw new IOException("error with job!");
    }

  }
}

HBaseDeleteMapper.java

public class HBaseDeleteMapper extends TableMapper<ImmutableBytesWritable, Delete>{
  @Override
  public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
    Delete delete = new Delete(row.get());
    context.write(row, delete);
  }
}

삭제를 '커밋'하기 위해 누락 된 것이 있습니까?

해결법

  1. ==============================

    1.테이블이 아닌 컨텍스트에 쓰는 경우 매퍼가 다음과 비슷하게 보입니다.

    테이블이 아닌 컨텍스트에 쓰는 경우 매퍼가 다음과 비슷하게 보입니다.

    public class HBaseDeleteMapper extends TableMapper<ImmutableBytesWritable, NullWritable>{
    
        private HTable myTable;
    
        protected void setup(Context context) throws IOException, InterruptedException {
            /* HTable instance for deletes */
            myTable = new HTable(HBaseConfiguration.create(), "myTable".getBytes());
        }
    
        public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
            myTable.delete(new Delete(row.get())); /* Delete the row from the table */
            //context.write(row, NullWritable.get()); /* Just an output with deleted rows if you need it for something (avoid it if you not) */
        }
    
        protected void cleanup(Context context) throws IOException, InterruptedException { 
            myTable.close(); /* Close table */
        }
    
    }
    

    삭제 작업은 쓰기 버퍼를 사용하지 않습니다.이 코드는 삭제 당 1 개의 RPC 작업을 실행하므로 이러한 유형의 작업에는 적합하지 않습니다. 이를 해결하기 위해 고유 한 List 를 빌드하여 배치 할 수 있습니다.

    public class HBaseDeleteMapper extends TableMapper<NullWritable, NullWritable>{
    
        private HTable myTable;
        private List<Delete> deleteList = new ArrayList<Delete>();
        final private int buffer = 10000; /* Buffer size, tune it as desired */
    
        protected void setup(Context context) throws IOException, InterruptedException {
            /* HTable instance for deletes */
            myTable = new HTable(HBaseConfiguration.create(), "myTable".getBytes());
        }
    
        public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
            deleteList.add(new Delete(row.get())); /* Add delete to the batch */
            if (deleteList.size()==buffer) {
                myTable.delete(deleteList); /* Submit batch */
                deleteList.clear(); /* Clear batch */
            }
        }
    
        protected void cleanup(Context context) throws IOException, InterruptedException {
            if (deleteList.size()>0) {
                myTable.delete(deleteList); /* Submit remaining batch */
            }
            myTable.close(); /* Close table */
        }
    
    }
    
  2. ==============================

    2.아래 코드는 공통 문자열을 포함하는 행 키를 hbase 테이블에서 스캔하고 목록의 크기가 1000보다 크면 삭제합니다 (목록이 공간 / 힙 공간을 벗어나지 않도록하는 것입니다). 당신의 hdfs에.

    아래 코드는 공통 문자열을 포함하는 행 키를 hbase 테이블에서 스캔하고 목록의 크기가 1000보다 크면 삭제합니다 (목록이 공간 / 힙 공간을 벗어나지 않도록하는 것입니다). 당신의 hdfs에.

    운전사

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    import org.oclc.wcsync.hadoop.mapper.HbaseBulkDeleteMapper;
    import org.oclc.wcsync.hadoop.util.JobName;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.Properties;
    
    /** doc. */
    public class HbaseBulkDelete extends Configured implements Tool{
    
        /** doc. */
        private static final Logger LOG = LoggerFactory.getLogger(HbaseBulkDelete.class);
    
    
        /**
         * doc.
         * @param args ...
         * @throws Exception ...
         */
        public static void main(String[] args) throws Exception {
            int res = ToolRunner.run(HBaseConfiguration.create(), new HbaseBulkDelete(), args);
            System.exit(res);
        }
    
        @Override
        public int run(String[] strings) throws Exception {
    
    
            JobName jobName = JobName.HBASE_DELETE;
            LOG.info ("Got into class driver");
            Configuration conf = HBaseConfiguration.create ();
            String env = "prod";
            Properties hadoopProps = new Properties();
            hadoopProps.load(HbaseBulkDelete.class.getResourceAsStream("/hadoop.config." + env + ".properties"));
            conf.set("jobName", jobName.name());
            conf.set ("hbase.master.catalog.timeout","600000");
            conf.set ("hbase.client.scanner.timeout.period","600000");
            conf.set ("hbase.rpc.timeout","6000000");
            conf.set ("mapred.task.timeout","6000000");
            conf.set("mapreduce.map.memory.mb","4096");
            Job job = new Job(conf);
            job.setJobName(jobName.format("HbaseBulkDelete"));
            job.setJarByClass(HbaseBulkDelete.class);
            Scan s = new Scan ();
            s.addFamily(Bytes.toBytes("data"));
            s.setStartRow (Bytes.toBytes ("Your_Substring"));
    
            TableMapReduceUtil.initTableMapperJob ("Ingest", s, HbaseBulkDeleteMapper.class, TextOutputFormat.class,
                    TextOutputFormat.class, job);
    
            job.setNumReduceTasks(0);
            job.setOutputFormatClass(TextOutputFormat.class);
            FileOutputFormat.setOutputPath(job, new Path("/user/neethu/HbaseBulkDelete"));
    
    
            return job.waitForCompletion(true) ? 0 : -1;
        }
    }
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.client.*;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableMapper;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.hadoop.io.Text;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    
    public class HbaseBulkDeleteMapper extends TableMapper<Text, Text> {
        private static final Logger LOG = LoggerFactory.getLogger(HbaseBulkDeleteMapper.class);
        Configuration conf;
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            conf = context.getConfiguration();
    
        }
        List<Delete> listOfBatchDelete = new ArrayList<Delete> ();
        @Override
        public void map(ImmutableBytesWritable row, Result values, Context context)
                throws IOException, InterruptedException {
            HTable table= new HTable(conf,"Ingest");
            if (listOfBatchDelete != null && !listOfBatchDelete.isEmpty () && listOfBatchDelete.size () > 1000) {
                LOG.info ("Deleted records!");
    
                listOfBatchDelete.clear ();
            }
            String KEY=Bytes.toString(values.getRow ());
            try {
                if (KEY.contains ("Your_substring") ){
                    LOG.info ("RowKey:"+KEY );
                    Delete d=new Delete(Bytes.toBytes(KEY));
                    listOfBatchDelete.add(d);
                    context.write (new Text ("RowKey"), new Text (KEY));
                }
    
            } catch (Exception e) {
                LOG.error ("error  ---" + e);
            }
           // table.delete(listOfBatchDelete);
        }
    }
    
  3. from https://stackoverflow.com/questions/21291236/hbase-map-only-row-delete by cc-by-sa and MIT license