全表扫描 Scan数据过滤 Filter
Scan+Filter结合用法 Hbase批量导入方法
批量导入之MapReduce方法批量导入之BulkLoad方法 Hbase批量导出方法
批量导出之TableMapReduce方法Hbase内置工具类
全表扫描 Scan常用Scan Java API:
scan.addFamily(); //指定列族 scan.addColumn(); //指定列,如果没有调用任何addFamily或Column,会返回所有的columns; scan.readAllVersions(); //读取所有版本数据。 scan.readVersions(3); //读取最新3个版本的数据 scan.setTimeRange(); //指定最大的时间戳和最小的时间戳,只有在此范围内的cell才能被获取. scan.setTimeStamp(); //指定时间戳 scan.setFilter(); //指定Filter来过滤掉不需要的信息 scan.withStartRow(); //指定开始的行。如果不指定,则从表头开始 scan.withStopRow(); //指定结束的行(不含此行) scan.setBatch(); //指定最多返回的Cell数目。用于防止一行中有过多的数据,导致OutofMemory错误。 scan.setCaching(); //指定scan底层每次连接返回的数据条数,默认值为1,适当调大可以提高查询性能,设置太大会比较耗内存数据过滤 Filter
RowFilter:对Rowkey进行过滤。
正则表达式:RegexStringComparator
Filter filter = new RowFilter(CompareOperator.EQUAL,new RegexStringComparator(".*x"));
监测字符串:SubstringComparator
Filter filter = new RowFilter(CompareOperator.EQUAL, new SubstringComparator(“x”));
开头:BinaryPrefixComparato
Filter filter = new RowFilter(CompareOperator.EQUAL, new BinaryPrefixComparator(Bytes.toBytes(“x”)));
PrefixFilter:筛选出具有特定前缀的行键的数据
Filter pf = new PrefixFilter(Bytes.toBytes(“前缀”));
ValueFilter:按照具体的值来筛选列中的数据,这会把一行中值不能满足的列过滤掉
Filter vf = new ValueFilter(CompareOperator.EQUAL, new SubstringComparator(“ROW2_QUAL1”));
CompareOperator.其他比较参数
LESS 小于 LESS_OR_EQUAL 小于等于 EQUAL 等于 NOT_EQUAL 不等于 GREATER_OR_EQUAL 大于等于 GREATER 大于 NO_OP 排除所有Scan+Filter结合用法
public class HbaseScanFilter { public static void main(String[] args) throws Exception{ //获取配置 Configuration conf = HbaseConfiguration.create(); //指定Hbase使用的zk的地址,多个都逗号隔开 conf.set("hbase.zookeeper.quorum", "bigdata01:2181,bigdata02:2181,bigdata03:2181"); //指定Hbase在hdfs上的根目录 conf.set("hbase.rootdir","hdfs://bigdata01:9000/hbase"); //创建Hbase连接,负责对Hbase中数据的增删改查(DML *** 作) Connection conn = ConnectionFactory.createConnection(conf); //获取Table对象,指定要 *** 作的表名,表需要提前创建好 Table table = conn.getTable(TableName.valueOf("s1")); Scan scan = new Scan(); //范围查询:指定查询区间,提高查询性能 //这是一个左闭右开的区间,也就是查询的结果中包含左边的,不包含右边的 scan.withStartRow(Bytes.toBytes("a")); scan.withStopRow(Bytes.toBytes("f")); //添加Filter对数据进行过滤:使用RowFilter进行过滤,获取Rowkey小于等于d的数据 Filter filter = new RowFilter(CompareOperator.LESS_OR_EQUAL,new BinaryComparator(Bytes.toBytes("d"))); scan.setFilter(filter); //获取查询结果 ResultScanner scanner = table.getScanner(scan); //迭代查询结果 for (Result result: scanner) { ListHbase批量导入方法cells = result.listCells(); //RowKey byte[] row_key = result.getRow(); for (Cell cell: cells) { //注意:下面获取的信息都是字节类型的,可以通过new String(bytes)转为字符串 //列族 byte[] famaily_bytes = CellUtil.cloneFamily(cell); //列 byte[] column_bytes = CellUtil.cloneQualifier(cell); //值 byte[] value_bytes = CellUtil.clonevalue(cell); System.out.println("Rowkey:"+new String(row_key)+",列族:"+new String(famaily_bytes)+",列:"+new String(column_bytes)+",值:"+new String(value_bytes)); } System.out.println("================================================================"); } //关闭连接 table.close(); conn.close(); } } |
批量导入两种方式:
利用MapReduce中封装好的方法。在map阶段,把数据封装成Put *** 作,直接将数据入库。利用Bulkload。首先使用MapReduce直接生成HFile文件,然后再通过Bulkload将HFile文件直接加载到表中。
Bulkload的优势:通过MapReduce生成Hbase底层HFile文件,直接加载到表中,省去了大部分的RPC和写过程。
批量导入之MapReduce方法
public class BatchimportMR { public static class BatchimportMapper extends Mapper批量导入之BulkLoad方法{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] strs = value.toString().split("t"); if(strs.length==4){ String rowkey = strs[0]; String columnFamily = strs[1]; String name = strs[2]; String val = strs[3]; Put put = new Put(rowkey.getBytes()); put.addColumn(columnFamily.getBytes(),name.getBytes(),val.getBytes()); context.write(NullWritable.get(),put); } } } public static void main(String[] args) throws Exception{ if(args.length!=2){ //如果传递的参数不够,程序直接退出 System.exit(100); } String inPath = args[0]; String outTableName = args[1]; //设置属性对应参数 Configuration conf = new Configuration(); conf.set("hbase.table.name",outTableName); conf.set("hbase.zookeeper.quorum","bigdata01:2181,bigdata02:2181,bigdata03:2181"); //封装Job Job job = Job.getInstance(conf, "Batch import Hbase Table:" + outTableName); job.setJarByClass(BatchimportMR.class); //指定输入路径 FileInputFormat.setInputPaths(job,new Path(inPath)); //指定map相关的代码 job.setMapperClass(BatchimportMapper.class); job.setMapOutputKeyClass(NullWritable.class); job.setMapOutputValueClass(Put.class); TableMapReduceUtil.initTableReducerJob(outTableName,null,job); TableMapReduceUtil.addDependencyJars(job); //禁用Reduce job.setNumReduceTasks(0); job.waitForCompletion(true); } }
第一步:生成HFile文件
public class BatchimportBulkLoad { public static class BulkLoadMapper extends MapperHbase批量导出方法{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] strs = value.toString().split("t"); if(strs.length==4){ String rowkey = strs[0]; String columnFamily = strs[1]; String name = strs[2]; String val = strs[3]; ImmutableBytesWritable rowkeyWritable = new ImmutableBytesWritable(rowkey.getBytes()); Put put = new Put(rowkey.getBytes()); put.addColumn(columnFamily.getBytes(),name.getBytes(),val.getBytes()); context.write(rowkeyWritable,put); } } } public static void main(String[] args) throws Exception{ if(args.length!=3){ //如果传递的参数不够,程序直接退出 System.exit(100); } String inPath = args[0]; String outPath = args[1]; String outTableName = args[2]; //设置属性对应参数 Configuration conf = new Configuration(); conf.set("hbase.table.name",outTableName); conf.set("hbase.zookeeper.quorum","bigdata01:2181,bigdata02:2181,bigdata03:2181"); //封装Job Job job = Job.getInstance(conf, "Batch import Hbase Table:" + outTableName); job.setJarByClass(BatchimportBulkLoad.class); //指定输入路径 FileInputFormat.setInputPaths(job,new Path(inPath)); //指定输出路径[如果输出路径存在,就将其删除] FileSystem fs = FileSystem.get(conf); Path output = new Path(outPath); if(fs.exists(output)){ fs.delete(output,true); } FileOutputFormat.setOutputPath(job, output); //指定map相关的代码 job.setMapperClass(BulkLoadMapper.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Put.class); //禁用Reduce job.setNumReduceTasks(0); Connection connection = ConnectionFactory.createConnection(conf); TableName tableName = TableName.valueOf(outTableName); HFileOutputFormat2.configureIncrementalLoad(job,connection.getTable(tableName),connection.getRegionLocator(tableName)); job.waitForCompletion(true); } }
批量导出两种方式:
开发MapReduce代码,利用TableMapReduceUtil将数据导出利用Hbase内部提供的Export工具类 批量导出之TableMapReduce方法
package com.imooc.hbase; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class BatchExportTableMapReduceUtil { public static class BatchExportMapper extends TableMapperHbase内置工具类{ @Override protected void map(ImmutableBytesWritable key, Result result, Context context) throws IOException, InterruptedException { //key在这里就是hbase的Rowkey //result是scan返回的每行结果 byte[] name = null; byte[] age = null; try{ name = result.getValue("c1".getBytes(), "name".getBytes()); }catch (Exception e){} try{ age = result.getValue("c1".getBytes(), "age".getBytes()); }catch (Exception e){} String v2 = ((name==null || name.length==0)?"NULL":new String(name))+"t"+ ((age==null || age.length==0)?"NULL":new String(age)); context.write(new Text(key.get()),new Text(v2)); } } public static void main(String[] args) throws Exception{ if(args.length!=2){ //如果传递的参数不够,程序直接退出 System.exit(100); } String inTableName = args[0]; String outPath = args[1]; //设置属性对应参数 Configuration conf = new Configuration(); conf.set("hbase.zookeeper.quorum","bigdata01:2181,bigdata02:2181,bigdata03:2181"); //组装Job Job job = Job.getInstance(conf); job.setJarByClass(BatchExportTableMapReduceUtil.class); //设置map相关的配置 job.setMapperClass(BatchExportMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //禁用Reduce job.setNumReduceTasks(0); //设置输入信息 TableMapReduceUtil.initTableMapperJob(inTableName, new Scan(), BatchExportMapper.class, Text.class, Text.class,job); //设置输出路径 FileOutputFormat.setOutputPath(job,new Path(outPath)); job.waitForCompletion(true); } }
使用Hbase提供的Export工具类直接导出数据
hbase org.apache.hadoop.hbase.mapreduce.Export 表名 hdfs地址 hbase org.apache.hadoop.hbase.mapreduce.Export batch1 hdfs://bigdata01:9000/batch2
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)