如何使用Java API *** 作Hbase

如何使用Java API *** 作Hbase,第1张

写了个Hbase新的api的增删改查的工具类,以供参考,直接拷贝代码就能用,散仙觉得基础的功能,都有了,代码如下:

package comdhgatehbasetest;

import javautilArrayList;

import javautilList;

import orgapachehadoopconfConfiguration;

import orgapachehadoophbaseCell;

import orgapachehadoophbaseCellUtil;

import orgapachehadoophbaseHBaseConfiguration;

import orgapachehadoophbaseHColumnDescriptor;

import orgapachehadoophbaseHTableDescriptor;

import orgapachehadoophbaseTableName;

import orgapachehadoophbaseclientDelete;

import orgapachehadoophbaseclientGet;

import orgapachehadoophbaseclientHBaseAdmin;

import orgapachehadoophbaseclientHTable;

import orgapachehadoophbaseclientPut;

import orgapachehadoophbaseclientResult;

import orgapachehadoophbaseclientResultScanner;

import orgapachehadoophbaseclientScan;

import orgapachehadoophbasefilterPageFilter;

import orgapachehadoophbasefilterPrefixFilter;

import orgapachehadoophbaseutilBytes;

/

基于新的API

Hbase096版本

写的工具类

@author qindongliang

数据技术交流群: 376932160

/

public class HbaseCommons {

static Configuration conf=HBaseConfigurationcreate();

static String tableName="";

public static void main(String[] args)throws Exception {

//String tableName="test";

//createTable(tableName, null);

}

/

批量添加数据

@param tableName 标名字

@param rows rowkey行健的集合

本方法仅作示例,其他的内容需要看自己义务改变

/

public static void insertList(String tableName,String rows[])throws Exception{

HTable table=new HTable(conf, tableName);

List<Put> list=new ArrayList<Put>();

for(String r:rows){

Put p=new Put(BytestoBytes(r));

//此处示例添加其他信息

//padd(BytestoBytes("family"),BytestoBytes("column"), 1000, BytestoBytes("value"));

listadd(p);

}

tableput(list);//批量添加

tableclose();//释放资源

}

/

创建一个表

@param tableName 表名字

@param columnFamilys 列簇

/

public static void createTable(String tableName,String[] columnFamilys)throws Exception{

//admin 对象

HBaseAdmin admin=new HBaseAdmin(conf);

if(admintableExists(tableName)){

Systemoutprintln("此表,已存在!");

}else{

//旧的写法

//HTableDescriptor tableDesc=new HTableDescriptor(tableName);

//新的api

HTableDescriptor tableDesc=new HTableDescriptor(TableNamevalueOf(tableName));

for(String columnFamily:columnFamilys){

tableDescaddFamily(new HColumnDescriptor(columnFamily));

}

admincreateTable(tableDesc);

Systemoutprintln("建表成功!");

}

adminclose();//关闭释放资源

}

/

删除一个表

@param tableName 删除的表名

/

public static void deleteTable(String tableName)throws Exception{

HBaseAdmin admin=new HBaseAdmin(conf);

if(admintableExists(tableName)){

admindisableTable(tableName);//禁用表

admindeleteTable(tableName);//删除表

Systemoutprintln("删除表成功!");

}else{

Systemoutprintln("删除的表不存在!");

}

adminclose();

}

/

插入一条数据

@param tableName 表明

@param columnFamily 列簇

@param column 列

@param value 值

/

public static void insertOneRow(String tableName,String rowkey,String columnFamily,String column,String value)throws Exception{

HTable table=new HTable(conf, tableName);

Put put=new Put(BytestoBytes(rowkey));

putadd(BytestoBytes(columnFamily), BytestoBytes(column), BytestoBytes(value));

tableput(put);//放入表

tableclose();//释放资源

}

/

删除一条数据

@param tableName 表名

@param row rowkey行键

/

public static void deleteOneRow(String tableName,String row)throws Exception{

HTable table=new HTable(conf, tableName);

Delete delete=new Delete(BytestoBytes(row));

tabledelete(delete);

tableclose();

}

/

删除多条数据

@param tableName 表名

@param rows 行健集合

/

public static void deleteList(String tableName,String rows[])throws Exception{

HTable table=new HTable(conf, tableName);

List<Delete> list=new ArrayList<Delete>();

for(String row:rows){

Delete del=new Delete(BytestoBytes(row));

listadd(del);

}

tabledelete(list);

tableclose();//释放资源

}

/

获取一条数据,根据rowkey

@param tableName 表名

@param row 行健

/

public static void getOneRow(String tableName,String row)throws Exception{

HTable table=new HTable(conf, tableName);

Get get=new Get(BytestoBytes(row));

Result result=tableget(get);

printRecoder(result);//打印记录

tableclose();//释放资源

}

/

查看某个表下的所有数据

@param tableName 表名

/

public static void showAll(String tableName)throws Exception{

HTable table=new HTable(conf, tableName);

Scan scan=new Scan();

ResultScanner rs=tablegetScanner(scan);

for(Result r:rs){

printRecoder(r);//打印记录

}

tableclose();//释放资源

}

/

查看某个表下的所有数据

@param tableName 表名

@param rowKey 行健

/

public static void ScanPrefixByRowKey(String tableName,String rowKey)throws Exception{

HTable table=new HTable(conf, tableName);

Scan scan=new Scan();

scansetFilter(new PrefixFilter(BytestoBytes(rowKey)));

ResultScanner rs=tablegetScanner(scan);

for(Result r:rs){

printRecoder(r);//打印记录

}

tableclose();//释放资源

}

/

查看某个表下的所有数据

@param tableName 表名

@param rowKey 行健扫描

@param limit 限制返回数据量

/

public static void ScanPrefixByRowKeyAndLimit(String tableName,String rowKey,long limit)throws Exception{

HTable table=new HTable(conf, tableName);

Scan scan=new Scan();

scansetFilter(new PrefixFilter(BytestoBytes(rowKey)));

scansetFilter(new PageFilter(limit));

ResultScanner rs=tablegetScanner(scan);

for(Result r:rs){

printRecoder(r);//打印记录

}

tableclose();//释放资源

}

/

根据rowkey扫描一段范围

@param tableName 表名

@param startRow 开始的行健

@param stopRow 结束的行健

/

public void scanByStartAndStopRow(String tableName,String startRow,String stopRow)throws Exception{

HTable table=new HTable(conf, tableName);

Scan scan=new Scan();

scansetStartRow(BytestoBytes(startRow));

scansetStopRow(BytestoBytes(stopRow));

ResultScanner rs=tablegetScanner(scan);

for(Result r:rs){

printRecoder(r);

}

tableclose();//释放资源

}

/

扫描整个表里面具体的某个字段的值

@param tableName 表名

@param columnFalimy 列簇

@param column 列

/

public static void getValueDetail(String tableName,String columnFalimy,String column)throws Exception{

HTable table=new HTable(conf, tableName);

Scan scan=new Scan();

ResultScanner rs=tablegetScanner(scan);

for(Result r:rs){

Systemoutprintln("值: " +new String(rgetValue(BytestoBytes(columnFalimy), BytestoBytes(column))));

}

tableclose();//释放资源

}

/

打印一条记录的详情

/

public static void printRecoder(Result result)throws Exception{

for(Cell cell:resultrawCells()){

Systemoutprint("行健: "+new String(CellUtilcloneRow(cell)));

Systemoutprint("列簇: "+new String(CellUtilcloneFamily(cell)));

Systemoutprint(" 列: "+new String(CellUtilcloneQualifier(cell)));

Systemoutprint(" 值: "+new String(CellUtilcloneValue(cell)));

Systemoutprintln("时间戳: "+cellgetTimestamp());

}

}

}

1、要在HBase表中实现索引,可以使用Regions建立列族和表,并通过对该列采用IndexTable设置索引参数来获得。

2、原因是HBase是一个分布式数据库,其中的数据都是有序的,可以利用这一有序性来获得更快的查询效果。

hbase snapshot数据迁移问题

不需要提前建表,分区也会自动同步

HBase自身也提供了ExportSnapshot的方法可以从HDFS文件层基于某个快照快速的导出HBase的数据,并不会对RegionServer造成影响,但该源生的方法不支持增量

1、在源集群执行

snapshot 'src_table', 'snapshot_src_table'

snapshot的流程主要有三个步骤

加锁: 加锁对象是regionserver的memstore,目的是禁止在创建snapshot过程中对数据进行insert,update,delete *** 作

刷盘:刷盘是针对当前还在memstore中的数据刷到HDFS上,保证快照数据相对完整,此步也不是强制的,如果不刷会,快照中数据有不一致风险

创建指针: snapshot过程不拷贝数据,但会创建对HDFS文件的指针,snapshot中存储的就是这些指针元数据

2、在源集群执行,属于推送方式,在目标集群执行数据拉取方式

hbase orgapachehadoophbasesnapshotExportSnapshot -snapshot test_snap -copy-from hdfs://HDFS80386/hbase -copy-to hdfs://shyt-hadoop-4031xxcomcn:8020/apps/hbase/data -mappers 20 -bandwidth 5

3、在目标集群执行使用hbase用户

disable 'dalishen:bbs_member'

restore_snapshot 'bbs_member_snap'

使用restore命令在目标集群自动新建表,以及与archive里的HFile建立link

执行该步骤的时候,可能会遇到权限问题,需要赋权限

Caused by: orgapachehadoopipcRemoteException(orgapachehadoopsecurityAccessControlException): Permission denied: user=hbase, access=WRITE, inode="/apps/hbase/data/archive/data/dalishen/bbs_member/f9406f2ff1fe4d542a5cc36b850c2689/f/links-91a554a73b1e41a7a0b33208331d62df":hadoop:hdfs:drwxr-xr-x

源集群

groups hadoop hdfs 可以发现导入的是源集群的权限

所以需要赋权限

hdfs dfs -chmod -R 777 /apps/hbase/data/archive/data/dalishen/bbs_member/

enable 'dalishen:bbs_member'

不需要提前建表,分区也会自动同步,支持增量备份,需要指定要备份的时间范围

copyTable也是属于HBase数据迁移的工具之一,以表级别进行数据迁移。copyTable的本质也是利用MapReduce进行同步的,与DistCp不同的时,它是利用MR去scan 原表的数据,然后把scan出来的数据写入到目标集群的表。这种方式也有很多局限,如一个表数据量达到T级,同时又在读写的情况下,全量scan表无疑会对集群性能造成影响。

13->11 高到低版本 不需要提前建表,分区也会自动同步

检查是否开启同步

echo "list_replicated_tables" | hbase shell -n |grep dalishen:app_deviceid

没有的话执行

enable_table_replication 'tname'

1源集群hadoop查询数据量,如太大先别迁移超过5000w

hbase orgapachehadoophbasemapreduceRowCounter 'dalishen:app_deviceid'

2源集群上执行 替换表名

hbase orgapachehadoophbasemapreduceCopyTable -Dhbaseclientscannercaching=1000 -Dmapredmaptasksspeculativeexecution=false -D mapreducetasktimeout=6000000 --families=f:f --peeradr=10522442:2181:/hbase-unsecure --newname=dalishen:app_deviceid dalishen:app_deviceid

3目标集群上执行数据量对比下

hbase orgapachehadoophbasemapreduceRowCounter 'dalishen:app_deviceid'

4指定时间戳进行增量同步

hbase orgapachehadoophbasemapreduceCopyTable -Dhbaseclientscannercaching=1000 -Dmapredmaptasksspeculativeexecution=false -D mapreducetasktimeout=6000000 --starttime=1600792683760 --endtime=1600792684760 --families=f:f --peeradr=17218127:2181:/hbase --newname=testwang testwang

在源集群进入hbase shell

1、 add_peer '1', 'shyt-hadoop-4032xxxcomcn,shyt-hadoop-4031xxxcomcn,shyt-hadoop-4030xxxcomcn:2181:/hbase-unsecure'

2、修改REPLICATION_SCOPE属性=1,全局模式,此数据会被复制给所有peer

alter 'testwang',{NAME => 'f' ,REPLICATION_SCOPE => '1'}

3、hbase(main):006:0> enable_table_replication 'testwang'

0 row(s) in 00860 seconds

The replication swith of table 'testwang' successfully enabled

验证在源集群 put 'testwang','1005','f:name','1005'

在目标集群 get 'testwang','1005'

校验数据量:通count

hbase orgapachehadoophbasemapreduceRowCounter 'testwang'

查看同步状态: status 'replication'

建议大表先进行snapshot方式同步,然后再利用copy进行增量数据同步,小表直接copy table数据迁移,最后配置hbase replication peer实时同步

hbase13

HTable 是我们对数据读取, *** 作的入口, implements HTableInterface, RegionLocator

内部构造

有一个检查 的动作待详细查看

关于BufferedMutator, 是用来缓存客户端的 *** 作的, hbase 将客户端的DML抽象成了 Mutation , 子类有: Append, Delete, Increment, Put *** 作

put方法将Put对象包装成Mutation,交给BufferedMutator, 到达设置的大小限制,或者主动调用flush *** 作, 会触发 backgroundFlushCommits(boolean synchronous) *** 作, 然后Mutation由 AsyncProcess 提交,详细查看 BufferedMutatorImpl 类

由 AscncProcess 提交后, (注释:Action类是将行与对应 *** 作结合的类), 由connection去寻找每一行对应的region位置, 包装action, server, region等信息添加到 MutiAction 中去, 这个类持有按照region分组的actions,

然后会对每个action都创建 SingleServerRequestRunnable (rpc caller 和rpc callable, caller call callable), 交给线程池去运行

删除 *** 作很简单: 创建 RegionServerCallable , 然后rpc工厂类创建rpc caller来调用它

get和scan都是继承了Query

get很简单:首先检查,这个get是否只是检查数据存在否, 并且检查是否指定了一致性等级(默认 (ConsistencySTRONG) ), 之后创建rpc请求Request, 如果 不是强一致性ConsistencyTIMELINE , 则调用 RpcRetryingCallerWithReadReplicas , 它可以从replica上读取, 返回的数据被标记为stale(读 *** 作是通过 ConsistencyTIMELINE ,然后读RPC将会首先发送到主region服务器上,在短时间内(hbaseclientprimaryCallTimeoutget默认为10ms),如果主region没有响应RPC会被发送到从region。 之后结果会从第一个完成RPC的返回。如果响应是来自主region副本,我们就会知道数据是最新的,ResultisStale() API是检查过期数据,如果结果是 从region返回,那么ResultisStale()为true,然后用户就可以检查关于过期数据可能的原因。)

当replica_id=0的regin不可以时候, 给所有的replica region发送请求,获取第一个从这些replica返回的数据, 客户端可以 ResultisStale()检查是否是来自副本的数据

Scan 类可以设置一系列的属性, startkey,endkey, 过滤器, 版本,缓存,最大取回大小等等, 但是获取数据是由 getScanner(Scan)返回的 ResultScanner *** 作的

返回的 ResultScanner 有small, Reversed,big和纯client 的不同,

什么是small scan

见 >

hbase概念:

非结构化的分布式的面向列存储非关系型的开源的数据库,根据谷歌的三大论文之一的bigtable

高宽厚表

作用:

为了解决大规模数据集合多重数据种类带来的挑战,尤其是大数据应用难题。

能干什么:

存储大量结果集数据,低延迟的随机查询。

sql:

结构化查询语言

nosql:

非关系型数据库,列存储和文档存储(查询低延迟),hbase是nosql的一个种类,其特点是列式存储。

非关系型数据库--列存储(hbase)

非关系型数据库--文档存储(MongoDB)

非关系型数据库--内存式存储(redis)

非关系型数据库--图形模型(graph)

hive和hbase区别

Hive的定位是数据仓库,虽然也有增删改查,但其删改查对应的是整张表而不是单行数据,查询的延迟较高。其本质是更加方便的使用mr的威力来进行离线分析的一个数据分析工具。

HBase的定位是hadoop的数据库,电脑培训发现是一个典型的Nosql,所以HBase是用来在大量数据中进行低延迟的随机查询的。

hbase运行方式:

standalonedistrubited

单节点和伪分布式

单节点:单独的进程运行在同一台机器上

hbase应用场景:

存储海量数据低延迟查询数据

hbase表由多行组成

hbase行一行在hbase中由行健和一个或多个列的值组成,按行健字母顺序排序的存储。

可以一起查。

1scan原理

HBase的查询实现只提供两种方式:

1、按指定RowKey 获取唯一一条记录,get方法(orgapachehadoophbaseclientGet)

Get 的方法处理分两种 : 设置了ClosestRowBefore 和没有设置的rowlock 主要是用来保证行的事务性,即每个get 是以一个row 来标记的一个row中可以有很多family 和column

2、按指定的条件获取一批记录,scan方法(orgapacheHadoophbaseclientScan)实现条件查询功能使用的就是scan 方式

1)scan 可以通过setCaching 与setBatch 方法提高速度(以空间换时间);

2)scan 可以通过setStartRow 与setEndRow 来限定范围([start,end)start 是闭区间,

end 是开区间)。范围越小,性能越高。

3)、scan 可以通过setFilter 方法添加过滤器,这也是分页、多条件查询的基础。

HBase中scan并不像大家想象的一样直接发送一个命令过去,服务器就将满足扫描条件的所有数据一次性返回给客户端。而实际上它的工作原理如下图所示:

上图右侧是HBase scan的客户端代码,其中for循环中每次遍历ResultScanner对象获取一行记录,实际上在客户端层面都会调用一次next请求。next请求整个流程可以分为如下几个步骤:

next请求首先会检查客户端缓存中是否存在还没有读取的数据行,如果有就直接返回,否则需要将next请求给HBase服务器端(RegionServer)。

如果客户端缓存已经没有扫描结果,就会将next请求发送给HBase服务器端。默认情况下,一次next请求仅可以请求100行数据(或者返回结果集总大小不超过2M)

服务器端接收到next请求之后就开始从BlockCache、HFile以及memcache中一行一行进行扫描,扫描的行数达到100行之后就返回给客户端,客户端将这100条数据缓存到内存并返回一条给上层业务。

HBase 每次 scan 的数据量可能会比较大,客户端不会一次性全部把数据从服务端拉回来。而是通过多次 rpc 分批次的拉取。类似于 TCP 协议里面一段一段的传输,可以做到细粒度的流量控制。至于如何调优,控制每次 rpc 拉取的数据量,就可以通过三个参数来控制。

setCaching => setNumberOfRowsFetchSize (客户端每次 rpc fetch 的行数)

setBatch => setColumnsChunkSize (客户端每次获取的列数)

setMaxResultSize => setMaxResultByteSize (客户端缓存的最大字节数)

hbaseclientscannercaching - (setCaching):HBase-098 默认值为为 100,HBase-12 默认值为 2147483647,即 IntegerMAX_VALUE。Scannext() 的一次 RPC 请求 fetch 的记录条数。配置建议:这个参数与下面的setMaxResultSize配合使用,在网络状况良好的情况下,自定义设置不宜太小, 可以直接采用默认值,不配置。

setBatch() 配置获取的列数,假如表有两个列簇 cf,info,每个列簇5个列。这样每行可能有10列了,setBatch() 可以控制每次获取的最大列数,进一步从列级别控制流量。配置建议:当列数很多,数据量大时考虑配置此参数,例如100列每次只获取50列。一般情况可以默认值(-1 不受限)。

hbaseclientscannermaxresultsize - (setMaxResultSize):HBase-098 无该项配置,HBase-12 默认值为 210241024,即 2M。Scannext() 的一次 RPC 请求 fetch 的数据量大小,目前 HBase-12 在 Caching 为默认值(Integer Max)的时候,实际使用这个参数控制 RPC 次数和流量。配置建议:如果网络状况较好(万兆网卡),scan 的数据量非常大,可以将这个值配置高一点。如果配置过高:则可能 loadCache 速度比较慢,导致 scan timeout 异常

hbaseserverscannermaxresultsize:服务端配置。HBase-098 无该项配置,HBase-12 新增,默认值为 10010241024,即 100M。该参数表示当 Scannext() 发起 RPC 后,服务端返回给客户端的最大字节数,防止 Server OOM。

要计算一次扫描 *** 作的RPC请求的次数,用户需要先计算出行数和每行列数的乘积。然后用这个值除以批量大小和每行列数中较小的那个值。最后再用除得的结果除以扫描器缓存值。 用数学公式表示如下:

RPC 返回的个数 = (row数 每行的列数)/ Min(每行列数,Batch大小) / Caching大小

Result 返回的个数 =( row数 每行的列数 )/ Min(每行列数,Batch大小)

复制

2Hbase Shell中使用

在hbase shell中查询数据,可以在hbase shell中直接使用过滤器:

# hbase shell > scan 'tablename',STARTROW=>'start',COLUMNS=>['family:qualifier'],FILTER=>"ValueFilter(=,'substring:88')"

复制

如上命令所示,查询的是表名为testByCrq,过滤方式是通过value过滤,匹配出value含111的数据。

因在hbase shell中一些 *** 作比较麻烦(比如删除字符需先按住ctrl在点击退格键),且退出后,查询的历史纪录不可考,故如下方式是比较方便的一种:

# echo "scan 'testByCrq', FILTER=>\"ValueFilter(=,'substring:111')\"" | hbase shell

复制

如上命令,可在bash中直接使用,表名是testByCrq,过滤方式是通过value过滤,匹配出value含111的数据,中间的"需要用\转义。

建表

create 'test1', 'lf', 'sf'

-- lf: column family of LONG values (binary value)

-- sf: column family of STRING values

复制

导入数据

put 'test1', 'user1|ts1', 'sf:c1', 'sku1'

put 'test1', 'user1|ts2', 'sf:c1', 'sku188'

put 'test1', 'user1|ts3', 'sf:s1', 'sku123'

put 'test1', 'user2|ts4', 'sf:c1', 'sku2'

put 'test1', 'user2|ts5', 'sf:c2', 'sku288'

put 'test1', 'user2|ts6', 'sf:s1', 'sku222'

put 'test1', 'user3|ts7', 'lf:c1', 12345

put 'test1', 'user3|ts8', 'lf:c1', 67890

复制

1限制条件

scan 'hbase:meta'

scan 'hbase:meta', {COLUMNS => 'info:regioninfo'}

scan 'ns1:t1', {COLUMNS => ['c1', 'c2'], LIMIT => 10, STARTROW => 'xyz'}

scan 't1', {COLUMNS => ['c1', 'c2'], LIMIT => 10, STARTROW => 'xyz'}

scan 't1', {COLUMNS => 'c1', TIMERANGE => [1303668804, 1303668904]}

scan 't1', {REVERSED => true}

复制

2Filter过滤

1rowkey查询

rowkey为user1开头的

scan 'test1', FILTER => "PrefixFilter ('user1')"

ROW COLUMN+CELL

user1|ts1 column=sf:c1, timestamp=1409122354868, value=sku1

user1|ts2 column=sf:c1, timestamp=1409122354918, value=sku188

user1|ts3 column=sf:s1, timestamp=1409122354954, value=sku123

复制

FirstKeyOnlyFilter: 一个rowkey可以有多个version,同一个rowkey的同一个column也会有多个的值, 只拿出key中的第一个column的第一个version KeyOnlyFilter: 只要key,不要value

scan 'test1', FILTER=>"FirstKeyOnlyFilter() AND ValueFilter(=,'binary:sku188') AND KeyOnlyFilter()"

ROW COLUMN+CELL

user1|ts2 column=sf:c1, timestamp=1409122354918, value=

复制

查询rowkey里面包含ts3的

scan 'test1', FILTER=>"RowFilter(=,'substring:ts3')"

ROW COLUMN+CELL

user1|ts3 column=sf:s1, timestamp=1554865926412, value=sku123

复制

从user1|ts2开始,找到所有的rowkey以user1开头的

scan 'test1', {STARTROW=>'user1|ts2', FILTER => "PrefixFilter ('user1')"}

ROW COLUMN+CELL

user1|ts2 column=sf:c1, timestamp=1409122354918, value=sku188

user1|ts3 column=sf:s1, timestamp=1409122354954, value=sku123

复制

从user1|ts2开始,找到所有的到rowkey以user2开头

scan 'test1', {STARTROW=>'user1|ts2', STOPROW=>'user2'}

ROW COLUMN+CELL

user1|ts2 column=sf:c1, timestamp=1409122354918, value=sku188 user1|ts3 column=sf:s1, timestamp=1409122354954, value=sku123

复制

2值查询

谁的值=sku188

scan 'test1', FILTER=>"ValueFilter(=,'binary:sku188')"

ROW COLUMN+CELL

user1|ts2 column=sf:c1, timestamp=1409122354918, value=sku188

复制

谁的值包含88

scan 'test1', FILTER=>"ValueFilter(=,'substring:88')"

ROW COLUMN+CELL

user1|ts2 column=sf:c1, timestamp=1409122354918, value=sku188

user2|ts5 column=sf:c2, timestamp=1409122355030, value=sku288

复制

值小于等于20000

scan 'test1', FILTER=>"ValueFilter(<=,'binary:20000')"

ROW COLUMN+CELL

user3|ts7 column=lf:c1, timestamp=1554866187587, value=12345

复制

注意:如果查询值大于20000,会查出所有值,因为“sku188”等值转为二进制后都大于20000。

substring不能使用小于等于等符号。

3列查询

column为c2,值包含88的用户

scan 'test1', FILTER=>"ColumnPrefixFilter('c2') AND ValueFilter(=,'substring:88')"

ROW COLUMN+CELL

user2|ts5 column=sf:c2, timestamp=1409122355030, value=sku288

复制

通过搜索进来的(column为s)值包含123或者222的用户

scan 'test1', FILTER=>"ColumnPrefixFilter('s') AND ( ValueFilter(=,'substring:123') OR ValueFilter(=,'substring:222') )"

ROW COLUMN+CELL

user1|ts3 column=sf:s1, timestamp=1409122354954, value=sku123

user2|ts6 column=sf:s1, timestamp=1409122355970, value=sku222

复制

列族查询

scan 'test1', FILTER=>"FamilyFilter(=,'substring:lf')"

ROW COLUMN+CELL

user3|ts7 column=lf:c1, timestamp=1554866187587, value=12345

user3|ts8 column=lf:c1, timestamp=1554866294485, value=67890

复制

4时间戳

scan 'test1',{FILTER=>"TimestampsFilter(1448069941270,1548069941230)" }

复制

3java查询

过滤器

HBase 的基本 API,包括增、删、改、查等。

增、删都是相对简单的 *** 作,与传统的 RDBMS 相比,这里的查询 *** 作略显苍白,只能根据特性的行键进行查询(Get)或者根据行键的范围来查询(Scan)。

HBase 不仅提供了这些简单的查询,而且提供了更加高级的过滤器(Filter)来查询。

过滤器的两类参数

过滤器可以根据列族、列、版本等更多的条件来对数据进行过滤,基于 HBase 本身提供的三维有序(行键,列,版本有序),这些过滤器可以高效地完成查询过滤的任务,带有过滤器条件的 RPC 查询请求会把过滤器分发到各个 RegionServer(这是一个服务端过滤器),这样也可以降低网络传输的压力。

使用过滤器至少需要两类参数:

一类是抽象的 *** 作符

HBase 提供了枚举类型的变量来表示这些抽象的 *** 作符:

LESS

LESS_OR_EQUAL

EQUAL

NOT_EQUAL

GREATER_OR_EQUAL

GREATER

NO_OP

另一类是比较器

代表具体的逻辑,例如字节级的比较,字符串级的比较等。

参数基础

有两个参数类在各类Filter中经常出现,统一介绍下:

(1)比较运算符 CompareFilterCompareOp

比较运算符用于定义比较关系,可以有以下几类值供选择:

EQUAL 相等

GREATER 大于

GREATER_OR_EQUAL 大于等于

LESS 小于

LESS_OR_EQUAL 小于等于

NOT_EQUAL 不等于

(2)比较器 ByteArrayComparable

通过比较器可以实现多样化目标匹配效果,比较器有以下子类可以使用:

BinaryComparator 匹配完整字节数组

BinaryPrefixComparator 匹配字节数组前缀

BitComparator

NullComparator

RegexStringComparator 正则表达式匹配

SubstringComparator 子串匹配

1,FilterList

FilterList 代表一个过滤器链,它可以包含一组即将应用于目标数据集的过滤器,过滤器间具有“与” FilterListOperatorMUST_PASS_ALL 和“或” FilterListOperatorMUST_PASS_ONE 关系。

官网实例代码,两个“或”关系的过滤器的写法:

FilterList list = new FilterList(FilterListOperatorMUST_PASS_ONE); //数据只要满足一组过滤器中的一个就可以

SingleColumnValueFilter filter1 = new SingleColumnValueFilter(cf,column,CompareOpEQUAL,BytestoBytes("my value"));

listadd(filter1);

SingleColumnValueFilter filter2 = new SingleColumnValueFilter(cf,column,CompareOpEQUAL,BytestoBytes("my other value"));

listadd(filter2);

Scan scan = new Scan();

scansetFilter(list);

复制

2,列值过滤器--SingleColumnValueFilter

SingleColumnValueFilter 用于测试列值相等 (CompareOpEQUAL ), 不等 (CompareOpNOT_EQUAL),或单侧范围 (eg, CompareOpGREATER)。

构造函数:

(1)比较的关键字是一个字符数组

SingleColumnValueFilter(byte[] family, byte[] qualifier, CompareFilterCompareOp compareOp, byte[] value)

(2)比较的关键字是一个比较器(比较器下一小节做介绍)

SingleColumnValueFilter(byte[] family, byte[] qualifier, CompareFilterCompareOp compareOp, ByteArrayComparable comparator)

注意:根据列的值来决定这一行数据是否返回,落脚点在行,而不是列。我们可以设置filtersetFilterIfMissing(true);如果为true,当这一列不存在时,不会返回,如果为false,当这一列不存在时,会返回所有的列信息

测试表user内容如下:

Table table = connectiongetTable(TableNamevalueOf("user"));

SingleColumnValueFilter scvf= new SingleColumnValueFilter(BytestoBytes("account"), BytestoBytes("name"),

CompareOpEQUAL,"zhangsan"getBytes());

scvfsetFilterIfMissing(true); //默认为false, 没有此列的数据也会返回 ,为true则只返回name=lisi的数据

Scan scan = new Scan();

scansetFilter(scvf);

ResultScanner resultScanner = tablegetScanner(scan);

for (Result result : resultScanner) {

List<Cell> cells= resultlistCells();

for (Cell cell : cells) {

String row = BytestoString(resultgetRow());

String family1 = BytestoString(CellUtilcloneFamily(cell));

String qualifier = BytestoString(CellUtilcloneQualifier(cell));

String value = BytestoString(CellUtilcloneValue(cell));

Systemoutprintln("[row:"+row+"],[family:"+family1+"],[qualifier:"+qualifier+"]"+ ",[value:"+value+"],[time:"+cellgetTimestamp()+"]");

}

}

复制

如果setFilterIfMissing(true), 有匹配只会返回当前列所在的行数据,基于行的数据 country 也返回了,因为他么你的rowkey是相同的

[row:zhangsan_1495527850824],[family:account],[qualifier:country],[value:china],[time:1495636452285]

[row:zhangsan_1495527850824],[family:account],[qualifier:name],[value:zhangsan],[time:1495556648729]

复制

如果setFilterIfMissing(false),有匹配的列的值相同会返回,没有此列的 name的也会返回,, 不匹配的name则不会返回。

下面 红色是匹配列内容的会返回,其他的不是account:name列也会返回,, name=lisi的不会返回,因为不匹配。

[row:lisi_1495527849910],[family:account],[qualifier:idcard],[value:42963319861234561230],[time:1495556647872]

[row:lisi_1495527850111],[family:account],[qualifier:password],[value:123451231236],[time:1495556648013]

[row:lisi_1495527850114],[family:address],[qualifier:city],[value:黄埔],[time:1495556648017]

[row:lisi_1495527850136],[family:address],[qualifier:province],[value:shanghai],[time:1495556648041]

[row:lisi_1495527850144],[family:info],[qualifier:age],[value:21],[time:1495556648045]

[row:lisi_1495527850154],[family:info],[qualifier:sex],[value:女],[time:1495556648056]

[row:lisi_1495527850159],[family:userid],[qualifier:id],[value:002],[time:1495556648060]

[row:wangwu_1495595824517],[family:userid],[qualifier:id],[value:009],[time:1495624624131]

[row:zhangsan_1495527850759],[family:account],[qualifier:idcard],[value:9897645464646],[time:1495556648664]

[row:zhangsan_1495527850759],[family:account],[qualifier:passport],[value:5689879898],[time:1495636370056]

[row:zhangsan_1495527850824],[family:account],[qualifier:country],[value:china],[time:1495636452285]

[row:zhangsan_1495527850824],[family:account],[qualifier:name],[value:zhangsan],[time:1495556648729]

[row:zhangsan_1495527850951],[family:address],[qualifier:province],[value:guangdong],[time:1495556648855]

[row:zhangsan_1495527850975],[family:info],[qualifier:age],[value:100],[time:1495556648878]

[row:zhangsan_1495527851080],[family:info],[qualifier:sex],[value:男],[time:1495556648983]

[row:zhangsan_1495527851095],[family:userid],[qualifier:id],[value:001],[time:1495556648996]

复制

3 键值元数据

由于HBase 采用键值对保存内部数据,键值元数据过滤器评估一行的键(ColumnFamily:Qualifiers)是否存在

31 基于列族过滤数据的FamilyFilter

构造函数:

FamilyFilter(CompareFilterCompareOp familyCompareOp, ByteArrayComparable familyComparator)

代码如下:

public static ResultScanner getDataFamilyFilter(String tableName,String family) throws IOException{

Table table = connectiongetTable(TableNamevalueOf("user"));

FamilyFilter ff = new FamilyFilter(CompareOpEQUAL ,

new BinaryComparator(BytestoBytes("account"))); //表中不存在account列族,过滤结果为空

// new BinaryPrefixComparator(value) //匹配字节数组前缀

// new RegexStringComparator(expr) // 正则表达式匹配

// new SubstringComparator(substr)// 子字符串匹配

Scan scan = new Scan();

// 通过scanaddFamily(family) 也可以实现此 *** 作

scansetFilter(ff);

ResultScanner resultScanner = tablegetScanner(scan);

return resultScanner;

}

复制

测试结果:查询的都是account列簇的内容

[row:lisi_1495527849910],[family:account],[qualifier:idcard],[value:42963319861234561230],[time:1495556647872]

[row:lisi_1495527850081],[family:account],[qualifier:name],[value:lisi],[time:1495556647984]

[row:lisi_1495527850111],[family:account],[qualifier:password],[value:123451231236],[time:1495556648013]

[row:zhangsan_1495527850759],[family:account],[qualifier:idcard],[value:9897645464646],[time:1495556648664]

[row:zhangsan_1495527850759],[family:account],[qualifier:passport],[value:5689879898],[time:1495636370056]

[row:zhangsan_1495527850824],[family:account],[qualifier:country],[value:china],[time:1495636452285]

[row:zhangsan_1495527850824],[family:account],[qualifier:name],[value:zhangsan],[time:1495556648729]

复制

32 基于限定符Qualifier(列)过滤数据的QualifierFilter

构造函数:

QualifierFilter(CompareFilterCompareOp op, ByteArrayComparable qualifierComparator)

Table table = connectiongetTable(TableNamevalueOf("user"));

QualifierFilter ff = new QualifierFilter(

CompareOpEQUAL , new BinaryComparator(BytestoBytes("name")));

// new BinaryPrefixComparator(value) //匹配字节数组前缀

// new RegexStringComparator(expr) // 正则表达式匹配

// new SubstringComparator(substr)// 子字符串匹配

Scan scan = new Scan();

// 通过scanaddFamily(family) 也可以实现此 *** 作

scansetFilter(ff);

ResultScanner resultScanner = tablegetScanner(scan);

复制

测试结果:只返回 name 的列内容

[row:lisi_1495527850081],[family:account],[qualifier:name],[value:lisi],[time:1495556647984]

[row:zhangsan_1495527850824],[family:account],[qualifier:name],[value:zhangsan],[time:1495556648729]

复制

33 基于列名(即Qualifier)前缀过滤数据的ColumnPrefixFilter

( 该功能用QualifierFilter也能实现 )

构造函数:

ColumnPrefixFilter(byte[] prefix)

Table table = connectiongetTable(TableNamevalueOf("user"));

ColumnPrefixFilter ff = new ColumnPrefixFilter(BytestoBytes("name"));

Scan scan = new Scan();

// 通过QualifierFilter的 newBinaryPrefixComparator也可以实现

scansetFilter(ff);

ResultScanner resultScanner = tablegetScanner(scan);

复制

返回结果:

[row:lisi_1495527850081],[family:account],[qualifier:name],[value:lisi],[time:1495556647984]

[row:zhangsan_1495527850824],[family:account],[qualifier:name],[value:zhangsan],[time:1495556648729]

复制

34 基于多个列名(即Qualifier)前缀过滤数据的MultipleColumnPrefixFilter

MultipleColumnPrefixFilter 和 ColumnPrefixFilter 行为差不多,但可以指定多个前缀

byte[][] prefixes = new byte[][] {BytestoBytes("name"), BytestoBytes("age")};

//返回所有行中以name或者age打头的列的数据

MultipleColumnPrefixFilter ff = new MultipleColumnPrefixFilter(prefixes);

Scan scan = new Scan();

scansetFilter(ff);

ResultScanner rs = tablegetScanner(scan);

复制

结果:

[row:lisi_1495527850081],[family:account],[qualifier:name],[value:lisi],[time:1495556647984]

[row:lisi_1495527850144],[family:info],[qualifier:age],[value:21],[time:1495556648045]

[row:zhangsan_1495527850824],[family:account],[qualifier:name],[value:zhangsan],[time:1495556648729]

[row:zhangsan_1495527850975],[family:info],[qualifier:age],[value:100],[time:1495556648878]

复制

35 基于列范围过滤数据ColumnRangeFilter

构造函数:

ColumnRangeFilter(byte[] minColumn, boolean minColumnInclusive, byte[] maxColumn, boolean maxColumnInclusive)

参数解释:

minColumn - 列范围的最小值,如果为空,则没有下限;

minColumnInclusive - 列范围是否包含minColumn ;

maxColumn - 列范围最大值,如果为空,则没有上限;

maxColumnInclusive - 列范围是否包含maxColumn 。

代码:

Table table = connectiongetTable(TableNamevalueOf("user"));

byte[] startColumn = BytestoBytes("a");

byte[] endColumn = BytestoBytes("d");

//返回所有列中从a到d打头的范围的数据,

ColumnRangeFilter ff = new ColumnRangeFilter(startColumn, true, endColumn, true);

Scan scan = new Scan();

scansetFilter(ff);

ResultScanner rs = tablegetScanner(scan);

复制

结果:返回列名开头是a 到 d的所有列数据

[row:lisi_1495527850114],[family:address],[qualifier:city],[value:黄埔],[time:1495556648017]

[row:lisi_1495527850144],[family:info],[qualifier:age],[value:21],[time:1495556648045]

[row:zhangsan_1495527850824],[family:account],[qualifier:country],[value:china],[time:1495636452285]

[row:zhangsan_1495527850975

1使用xshell或者crt等工具连接到hbase所在的服务器

2然后通过ls查找到hbase

3然后cd切换到hbase目录下

4bin/start-hbasesh

5bin/hbaseshell

6list查看该用户下的所有表格

前身:BigTable

网页搜索:

google分布式存储系统BigTable依赖GFS

Hbase(bigtable的开源实现): 高可靠、高性能、面向列、可伸缩

存储结构化和半结构化的数据

优点:

水平可扩展性特别好:

依赖:

文件存储系统:HDFS

海量数据处理:MapReduce

协同管理服务:Zookeeper

满足了:大数据量的实时计算

数据类型:

    RDBMS:关系数据模型、多种数据类型

    Hbase:

数据 *** 作:

存储模式:

索引:

数据维护:

可伸缩性:

        纵向扩展:

        水平扩展:

Hbase的访问接口:

            JAVA API

            shell

            thrift Gateway

            restful Gateway

            SQL接口:pig编写类sql  hive用hivesql访问Hbase

Hbase的数据类型:

        列限定符

        每个值都是未解释的bytes

        一个行可以有一个行键和多列

        表由列族组成

Hbase数据模型:

    列族支持动态扩展、保留旧版本(HDFS只能追加数据)

基础元素:

    行键 : rowkey

    列族

    列限定符

    单元格 (时间戳概念、对应数据版本)

坐标概念:

    四维定位:行键、列族、列限定符、时间戳

稀疏表

HBASE:面向列的存储:高数据压缩率、分析便捷

RDBMS :面向行存储,事务性 *** 作(记录完整)、不便于分析(需要全表扫描)

43 HBASE 的实现原理

431 库函数 、master服务器、region服务器

Master服务器:

分区信息进行维护和管理

维护region服务器列表

确认当前工作的region服务器

负责对region进行分配和负载平衡

对表的增删改查

region服务器:

客户端不依赖于Master获取位置信息

用户数据的存储和管理

Region服务器--10-1000个region -----Store是一个列族----每个列族就是一个Hfile----所有region公用1个Hlog

写数据流程:Region服务器---写缓存Memstore---写日志(Hlog)

读数据流程:Region服务器-读缓存Memstore(最新数据)----StoreFile

缓存刷新:周期性将缓存内容刷写到Storefile 清空缓存---Hlog写入标记

每次刷写会生成新的StoreFile 每个Store包含多个StoreFile

每个Region服务器都有一个自己的Hlog,将启动检查确认缓存刷新是否有新的内容需要刷写,发现则刷写新的storefile,完成后删除Hlog,开始对外提供服务

Storefile的合并,storefile 的数量达到阈值后,会进行合并。当Storefile超过大小阈值则会触发Region的分裂

44 Hlog的工作原理

Zookeeper负责监听region服务器,由master处理故障,通过故障服务器的Hlog恢复,按region切分Hlog,将region和对应的Hlog分配到新的region服务器上

一个HBASE表会被划分成多个Region(1G-2G 取决于服务器性能)

同一个region不会被拆分到不同服务器上

Region的寻找:

Meta表:regionID 服务器ID 存储元数据

Root表:只有一个region

三级寻址:

zookeeper文件---root表-多个meta表--多个用户数据表

客户端会有Hbase三层寻址的缓存,调用访问Hbase的接口,缓存失效后,再次寻址

zookeeper决定master服务器,确保只有一个master

45 Hbase的应用方案

性能优化:

1)时间靠近存放----将时间戳引入行键,使用Longmax-时间戳进行排序

2)提升读写性能,创建表时设置HcloumnDescriptorsetMemory=true,会将表放入内存的缓存中

3)节省存储·空间----设置最大版本数、保存最新版的数据,将最大版本参数设置为1

4)timetolive参数,会将过期数据自动清空

检测Hbase性能:

Maste-status(web浏览器查询)

ganglia

OpenTSDB

Armbari

sql 查询HBASE

1)hive整合hbase

2)Phoenix

Hbase 二级索引 (辅助索引)

默认只支持对rowkey进行索引

Hbase行访问:

1)单行键访问

2)确定起点和终点访问区间数据

3)全表扫描

二级索引样例:

    Hindex    Hbase+redis  Solr+ Hbase

二级索引的机制:

        Hbase Coprocessor 

        endpoint  ---存储过程

        observer----触发器

        通过Observer监测数据插入动作,同步写入索引表,完成对表和列的索引

      Hbase 主表 索引表

46 HBASE的shell命令

三种部署模式:单机 伪分布式  分布式

HDFS

创建表

create table, F1, F2, F3

list table

每次只能为1行的1列添加数据

put  table R1,R1:C1 ,“1,2,3”

scan  table  R1,{column='R1:C1'}

get  table

删除表:

disable table +drop table

47 JAVA API +HBASE

以上就是关于如何使用Java API *** 作Hbase全部的内容,包括:如何使用Java API *** 作Hbase、请描述怎样才能获得hbase表中哪些列能实现索引为什么、hbase 数据迁移等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/web/9452364.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-04-28
下一篇 2023-04-28

发表评论

登录后才能评论

评论列表(0条)

保存