HBase高级用法

HBase高级用法,第1张

HBase高级用法

文章目录

全表扫描 Scan数据过滤 Filter

Scan+Filter结合用法 Hbase批量导入方法

批量导入之MapReduce方法批量导入之BulkLoad方法 Hbase批量导出方法

批量导出之TableMapReduce方法Hbase内置工具类

全表扫描 Scan

常用Scan Java API:

scan.addFamily(); //指定列族
scan.addColumn(); //指定列,如果没有调用任何addFamily或Column,会返回所有的columns;
scan.readAllVersions(); //读取所有版本数据。
scan.readVersions(3); //读取最新3个版本的数据
scan.setTimeRange(); //指定最大的时间戳和最小的时间戳,只有在此范围内的cell才能被获取.
scan.setTimeStamp(); //指定时间戳
scan.setFilter(); //指定Filter来过滤掉不需要的信息
scan.withStartRow(); //指定开始的行。如果不指定,则从表头开始
scan.withStopRow(); //指定结束的行(不含此行)
scan.setBatch(); //指定最多返回的Cell数目。用于防止一行中有过多的数据,导致OutofMemory错误。
scan.setCaching(); //指定scan底层每次连接返回的数据条数,默认值为1,适当调大可以提高查询性能,设置太大会比较耗内存
数据过滤 Filter

RowFilter:对Rowkey进行过滤。

正则表达式:RegexStringComparator

Filter filter = new RowFilter(CompareOperator.EQUAL,new RegexStringComparator(".*x"));

监测字符串:SubstringComparator

Filter filter = new RowFilter(CompareOperator.EQUAL, new SubstringComparator(“x”));

开头:BinaryPrefixComparato

Filter filter = new RowFilter(CompareOperator.EQUAL, new BinaryPrefixComparator(Bytes.toBytes(“x”)));

PrefixFilter:筛选出具有特定前缀的行键的数据

Filter pf = new PrefixFilter(Bytes.toBytes(“前缀”));

ValueFilter:按照具体的值来筛选列中的数据,这会把一行中值不能满足的列过滤掉

Filter vf = new ValueFilter(CompareOperator.EQUAL, new SubstringComparator(“ROW2_QUAL1”));

CompareOperator.其他比较参数

	LESS   				小于   
	LESS_OR_EQUAL 	 	小于等于   
	EQUAL 				等于   
	NOT_EQUAL  			不等于   
	GREATER_OR_EQUAL 	大于等于
	GREATER  			大于
	NO_OP 				排除所有 
Scan+Filter结合用法

public class HbaseScanFilter {
    public static void main(String[] args) throws Exception{
        //获取配置
        Configuration conf = HbaseConfiguration.create();
        //指定Hbase使用的zk的地址,多个都逗号隔开
        conf.set("hbase.zookeeper.quorum", "bigdata01:2181,bigdata02:2181,bigdata03:2181");
        //指定Hbase在hdfs上的根目录
        conf.set("hbase.rootdir","hdfs://bigdata01:9000/hbase");
        //创建Hbase连接,负责对Hbase中数据的增删改查(DML *** 作)
        Connection conn = ConnectionFactory.createConnection(conf);
        //获取Table对象,指定要 *** 作的表名,表需要提前创建好
        Table table = conn.getTable(TableName.valueOf("s1"));

        Scan scan = new Scan();
        //范围查询:指定查询区间,提高查询性能
        //这是一个左闭右开的区间,也就是查询的结果中包含左边的,不包含右边的
        scan.withStartRow(Bytes.toBytes("a"));
        scan.withStopRow(Bytes.toBytes("f"));

        //添加Filter对数据进行过滤:使用RowFilter进行过滤,获取Rowkey小于等于d的数据
        Filter filter = new RowFilter(CompareOperator.LESS_OR_EQUAL,new BinaryComparator(Bytes.toBytes("d")));
        scan.setFilter(filter);

        //获取查询结果
        ResultScanner scanner = table.getScanner(scan);
        //迭代查询结果
        for (Result result: scanner) {
            List cells = result.listCells();
            //RowKey
            byte[] row_key = result.getRow();
            for (Cell cell: cells) {
                //注意:下面获取的信息都是字节类型的,可以通过new String(bytes)转为字符串
                //列族
                byte[] famaily_bytes = CellUtil.cloneFamily(cell);
                //列
                byte[] column_bytes = CellUtil.cloneQualifier(cell);
                //值
                byte[] value_bytes = CellUtil.clonevalue(cell);
                System.out.println("Rowkey:"+new String(row_key)+",列族:"+new String(famaily_bytes)+",列:"+new String(column_bytes)+",值:"+new String(value_bytes));
            }
            System.out.println("================================================================");
        }

        //关闭连接
        table.close();
        conn.close();
    }
}


Hbase批量导入方法

批量导入两种方式:

利用MapReduce中封装好的方法。在map阶段,把数据封装成Put *** 作,直接将数据入库。利用Bulkload。首先使用MapReduce直接生成HFile文件,然后再通过Bulkload将HFile文件直接加载到表中。
Bulkload的优势:通过MapReduce生成Hbase底层HFile文件,直接加载到表中,省去了大部分的RPC和写过程。 批量导入之MapReduce方法

public class BatchimportMR {
    public static class BatchimportMapper extends Mapper{
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String[] strs = value.toString().split("t");
            if(strs.length==4){
                String rowkey = strs[0];
                String columnFamily = strs[1];
                String name = strs[2];
                String val = strs[3];
                
                Put put = new Put(rowkey.getBytes());
                put.addColumn(columnFamily.getBytes(),name.getBytes(),val.getBytes());
                context.write(NullWritable.get(),put);
            }
        }
    }

    public static void main(String[] args) throws Exception{
        if(args.length!=2){
            //如果传递的参数不够,程序直接退出
            System.exit(100);
        }

        String inPath = args[0];
        String outTableName = args[1];
        //设置属性对应参数
        Configuration conf = new Configuration();
        conf.set("hbase.table.name",outTableName);
        conf.set("hbase.zookeeper.quorum","bigdata01:2181,bigdata02:2181,bigdata03:2181");

        //封装Job
        Job job = Job.getInstance(conf, "Batch import Hbase Table:" + outTableName);
        job.setJarByClass(BatchimportMR.class);

        //指定输入路径
        FileInputFormat.setInputPaths(job,new Path(inPath));

        //指定map相关的代码
        job.setMapperClass(BatchimportMapper.class);
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(Put.class);

        TableMapReduceUtil.initTableReducerJob(outTableName,null,job);
        TableMapReduceUtil.addDependencyJars(job);

        //禁用Reduce
        job.setNumReduceTasks(0);

        job.waitForCompletion(true);
    }
}
批量导入之BulkLoad方法

图1 bulkLoad方法实现批量导入示意图

第一步:生成HFile文件

public class BatchimportBulkLoad {
    public static class BulkLoadMapper extends Mapper{
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String[] strs = value.toString().split("t");
            if(strs.length==4){
                String rowkey = strs[0];
                String columnFamily = strs[1];
                String name = strs[2];
                String val = strs[3];
                ImmutableBytesWritable rowkeyWritable = new ImmutableBytesWritable(rowkey.getBytes());
                Put put = new Put(rowkey.getBytes());
                put.addColumn(columnFamily.getBytes(),name.getBytes(),val.getBytes());
                context.write(rowkeyWritable,put);
            }
        }
    }

    public static void main(String[] args) throws Exception{
        if(args.length!=3){
            //如果传递的参数不够,程序直接退出
            System.exit(100);
        }

        String inPath = args[0];
        String outPath = args[1];
        String outTableName = args[2];

        //设置属性对应参数
        Configuration conf = new Configuration();
        conf.set("hbase.table.name",outTableName);
        conf.set("hbase.zookeeper.quorum","bigdata01:2181,bigdata02:2181,bigdata03:2181");

        //封装Job
        Job job = Job.getInstance(conf, "Batch import Hbase Table:" + outTableName);
        job.setJarByClass(BatchimportBulkLoad.class);

        //指定输入路径
        FileInputFormat.setInputPaths(job,new Path(inPath));

        //指定输出路径[如果输出路径存在,就将其删除]
        FileSystem fs = FileSystem.get(conf);
        Path output = new Path(outPath);
        if(fs.exists(output)){
            fs.delete(output,true);
        }
        FileOutputFormat.setOutputPath(job, output);

        //指定map相关的代码
        job.setMapperClass(BulkLoadMapper.class);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(Put.class);

        //禁用Reduce
        job.setNumReduceTasks(0);

        Connection connection = ConnectionFactory.createConnection(conf);
        TableName tableName = TableName.valueOf(outTableName);
        HFileOutputFormat2.configureIncrementalLoad(job,connection.getTable(tableName),connection.getRegionLocator(tableName));


        job.waitForCompletion(true);
    }
}
Hbase批量导出方法

批量导出两种方式:

开发MapReduce代码,利用TableMapReduceUtil将数据导出利用Hbase内部提供的Export工具类 批量导出之TableMapReduce方法

package com.imooc.hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;


public class BatchExportTableMapReduceUtil {
    public static class BatchExportMapper extends TableMapper{
        @Override
        protected void map(ImmutableBytesWritable key, Result result, Context context)
                throws IOException, InterruptedException {
            //key在这里就是hbase的Rowkey
            //result是scan返回的每行结果
            byte[] name = null;
            byte[] age = null;
            try{
                name = result.getValue("c1".getBytes(), "name".getBytes());
            }catch (Exception e){}
            try{
                age = result.getValue("c1".getBytes(), "age".getBytes());
            }catch (Exception e){}

            String v2 = ((name==null || name.length==0)?"NULL":new String(name))+"t"+
            			((age==null || age.length==0)?"NULL":new String(age));

            context.write(new Text(key.get()),new Text(v2));
        }
    }

    public static void main(String[] args) throws Exception{
        if(args.length!=2){
            //如果传递的参数不够,程序直接退出
            System.exit(100);
        }

        String inTableName = args[0];
        String outPath = args[1];

        //设置属性对应参数
        Configuration conf = new Configuration();
        conf.set("hbase.zookeeper.quorum","bigdata01:2181,bigdata02:2181,bigdata03:2181");

        //组装Job
        Job job = Job.getInstance(conf);
        job.setJarByClass(BatchExportTableMapReduceUtil.class);

        //设置map相关的配置
        job.setMapperClass(BatchExportMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        //禁用Reduce
        job.setNumReduceTasks(0);

        //设置输入信息
        TableMapReduceUtil.initTableMapperJob(inTableName,
        										new Scan(),
        										BatchExportMapper.class,
        										Text.class,
        										Text.class,job);

        //设置输出路径
        FileOutputFormat.setOutputPath(job,new Path(outPath));

        job.waitForCompletion(true);

    }
}


Hbase内置工具类

使用Hbase提供的Export工具类直接导出数据

hbase org.apache.hadoop.hbase.mapreduce.Export   表名  hdfs地址
hbase org.apache.hadoop.hbase.mapreduce.Export batch1 hdfs://bigdata01:9000/batch2

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5705295.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存