深入理解HBASE(3.4)RegionServer-Memstore

深入理解HBASE(3.4)RegionServer-Memstore,第1张

Region内每个ColumnFamily的数据组成一个Store。每个Store内包括一个MemStore和若干个StoreFile(HFile)组成。

HBase为了方便按照RowKey进行检索,要求HFile中数据都按照RowKey进行排序,Memstore数据在flush为HFile之前会进行一次排序

为了减少flush过程对读写的影响,HBase采用了类似于两阶段提交的方式,将整个flush过程分为三个阶段:

要避免“写阻塞”,貌似让Flush *** 作尽量的早于达到触发“写 *** 作”的阈值为宜。但是,这将导致频繁的Flush *** 作,而由此带来的后果便是读性能下降以及额外的负载。

每次的Memstore Flush都会为每个CF创建一个HFile。频繁的Flush就会创建大量的HFile。这样HBase在检索的时候,就不得不读取大量的HFile,读性能会受很大影响。

为预防打开过多HFile及避免读性能恶化,HBase有专门的HFile合并处理(HFile Compaction Process)。HBase会周期性的合并数个小HFile为一个大的HFile。明显的,有Memstore Flush产生的HFile越多,集群系统就要做更多的合并 *** 作(额外负载)。更糟糕的是:Compaction处理是跟集群上的其他请求并行进行的。当HBase不能够跟上Compaction的时候(同样有阈值设置项),会在RS上出现“写阻塞”。像上面说到的,这是最最不希望的。

提示 :严重关切RS上Compaction Queue 的size。要在其引起问题前,阻止其持续增大。

想了解更多HFile 创建和合并,可参看 Visualizing HBase Flushes And Compactions 。

理想情况下,在不超过hbaseregionserverglobalmemstoreupperLimit的情况下,Memstore应该尽可能多的使用内存(配置给Memstore部分的,而不是真个Heap的)。下图展示了一张“较好”的情况:

hbase使用的是jdk提供的ConcurrentSkipListMap,并对其进行了的封装,Map结构是<KeyValue,KeyValue>的形式。Concurrent表示线程安全。

SkipList是一种高效的数据结构,之前专门写过文章,这里就不表了

写入MemStore中的KV,被记录在kvset中。根据JVM内存的垃圾回收策略,在如下条件会触发Full GC。 1、内存满或者触发阈值。 2、内存碎片过多,造成新的分配找不到合适的内存空间。 RS上服务多个Region,如果不对KV的分配空间进行控制的话,由于访问的无序性以及KV长度的不同,每个Region上的KV会无规律地分散在内存上。Region执行了MemStore的Flush *** 作,再经过JVM GC之后就会出现零散的内存碎片现象,而进一步数据大量写入,就会触发Full-GC。

为了解决因为内存碎片造成的Full-GC的现象,RegionServer引入了MSLAB(HBASE-3455)。MSLAB全称是MemStore-Local Allocation Buffers。它通过预先分配连续的内存块,把零散的内存申请合并,有效改善了过多内存碎片导致的Full GC问题。 MSLAB的工作原理如下: 1、在MemStore初始化时,创建MemStoreLAB对象allocator。 2、创建一个2M大小的Chunk数组,偏移量起始设置为0。Chunk的大小可以通过参数hbasehregionmemstoremslabchunksize调整。 3、 当MemStore有KeyValue加入时,maybeCloneWithAllocator(KeyValue)函数调用allocator为其查找KeyValuegetBuffer()大小的空间,若KeyValue的大小低于默认的256K,会尝试在当前Chunk下查找空间,如果空间不够,MemStoreLAB重新申请新的Chunk。选中Chunk之后,会修改offset=原偏移量+KeyValuegetBuffer()length。chunk内控制每个KeyValue大小由hbasehregionmemstoremslabmaxallocation配置。 4、 空间检查通过的KeyValue,会拷贝到Chunk的数据块中。此时,原KeyValue由于不再被MemStore引用,会在接下来的JVM的Minor GC被清理。

MSLAB解决了因为碎片造成Full GC的问题,然而在MemStore被Flush到文件系统时,没有reference的chunk,需要GC来进行回收,因此,在更新 *** 作频繁发生时,会造成较多的Young GC。 针对该问题,HBASE-8163提出了MemStoreChunkPool的解决方案,方案已经被HBase-095版本接收。它的实现思路: 1、 创建chunk池来管理没有被引用的chunk,不再依靠JVM的GC回收。 2、 当一个chunk没有引用时,会被放入chunk池。 3、chunk池设置阈值,如果超过了,则会放弃放入新的chunk到chunk池。 4、 如果当需要新的chunk时,首先从chunk池中获取。 根据patch的测试显示,配置MemStoreChunkPool之后,YGC降低了40%,写性能有5%的提升。如果是095以下版本的用户,可以参考HBASE-8163给出patch。

1、HBase写入流程

HBase服务端没有提供update,delete接口,HBase中对数据的更新、删除 *** 作都认为是写入 *** 作,更新 *** 作会写入一个最小版本数据,删除 *** 作写写入一条标记为deleted的KV数据

11、写入流程三个阶段概况

1)客户端处理阶段:客户端将用户请求进行预处理,并根据集群元数据定位写入数据所在的RegionServer,将请求发送给RS

2)Region写入阶段:RS收到请求之后解析数据,首先把数据写入WAL,再写入对应Region对应的MemStore

3)MemStore Flush阶段:当Region中MemStore容量达到一定阈值之后,系统异步执行flush *** 作,将内存写入文件,形成HFile

12、用户写入请求在完成写入MemStore之后就会返回成功。MemStore Flush是一个异步执行的过程。

13、客户端处理阶段步骤详解:

1)客户端可以设置批量提交,如果设置了批量提交(autoflush=false)客户端会先将数据写入本地缓冲区等达到一定阈值之后才会提交。否则put请求直接会提交给服务端进行处理。

2)RS寻址,在提交之前HBase会在元数据表hbase:meta中根据rowkey找到她们归属的RS

21)客户端根据写入的表和rowkey在元数据中查找,如果能够查找出该rowkey所在的RS及Region,就直接发送写入请求

22)如果客户端没有找到rowkey信息,需要首先到zk上找到hbase:meta表所在的RS,向那RS发送查询请求获取元数据,然后在元数据中查找rowkey所在的RS,并将元数据缓存在本地,以备下次使用。

3)客户端发送远程RPC请求给RS,将数据写入目标Region的MemStore中

14、Region写入阶段步骤详解:

1)获取行锁,HBase中使用行锁保证对同一行数据的更新是互斥 *** 作,用以保证更新的原子性,要么成功要么失败

2)更新所有待写入keyValue的时间戳为当前系统时间

3)对一次写入同一个Region的一个或多个KeyValue构建一条WALEdit记录,这样做的目的是保证Region级别事务的写入原子性

4)把WALEdit写入HLog,HLog是存储在HDFS上需要sync *** 作把HLog真正落地到HDFS,在这一部暂时不用执行sync,HBase使用了disruptor实现了高效的生产者消费者队列,来异步实现WAL的追加写入 *** 纵

5)写入WAL之后再将数据写入MemStore

6)释放行锁

7)sync WAL:将HLog真正sync到HDFS,如果sync失败,执行回滚 *** 作将MemStore数据移除

8)结束写事务。更新对外可见,更新生效

15、MemStore Flush阶段详解:

151、触发flush条件

1511、MemStore级别限制,当Rgion中任意一个MemStore大小达到阈值(hbasehrgionmemstoreflushsize)默认128M

1512、Region级别限制:当Region所有MemStore的大小达到了上限(hbasehregionmemstoreblockmultiplier hbasehrgionmemstoreflushsize)超过memstore大小的倍数达到该值则阻塞所有写入请求进行flush,自我保护默认是2

1513、RegionServer级别限制:当RS中MemStore的总大小超过低水位阈值hbaseregionserverglobalmemstoresizelowerlimit hbasereagionserverglobalmemstoresize RS则开始强制执行flush,按Region中MemStore大小从大到小进行flush,直到总MemStore大小下降到低水位。

1514、当一个RegionServer中HLog数量达到一定上限(hbaseregionservermaxlogs),系统选择最早的HLog对应的Rgion进行Flush

1515、HBase定期Flush,默认是1小时确保MemStore不会长时间没有持久化。为了避免同一时间所有都进行flush,定期的flush *** 作有一定时间的随机延迟

1516、手动flush,用户可以通过flush 'tablename'或者 flush 'regionname'对一个表或者Region进行flush

152、flush执行步骤

1521、prepare阶段

遍历当前region下的MemStore做一个快照,然后新一个ConcurrentSkipListMap接受新的数据请求。此阶段需要通过锁来阻塞写请求,结束后释放锁,此过程持锁时间很短

1522、flush阶段

对快照数据按照特定格式生成HFile持久化为临时文件放在tmp目录下。这个过程涉及到磁盘IO *** 作,相对比较耗时

1523、commit阶段

把临时文件移动到指定的CF目录下。再清空快照数据。

153、MemStore Flush对业务的影响

1531、大部分MemStore Flush *** 作都不会对业务读写产生太大影响,

1532、Region Server级别呆滞的flush,会对用户请求产生较大影响,会阻塞落在该RS上的写入 *** 作。

16、HLog写入模型

161、HLog持久化级别

SKIP_WAL:只写缓存,不写HLog,不可取

ASYNC_WAL:异步写入HLog

SYNC_WAL:同步写入日志文件,数据只是被写入文件系统缓存中并没有真正落盘。默认是此级别

FSYNC_WAL:同步将数据写入日志文件并强制落盘,这是最严格的写入级别,保证数据不丢失,性能相对较差

USER_DEFAULT:如果用户没有指定持久化级别,默认HBase使用SYN_WAL等级持久化数据putsetDurability(DurabilitySYNC_WAL);

162、HLog写入模型

1、HLog写入需要经过3个阶段:手写将数据写入本地缓存,然后将本地缓存写入文件系统,最后执行syn *** 作同步到磁盘

2、HBase使用LMAX Disruptor框架实现了无锁有界队列 *** 作,写入模型如下图

2、BulkLoad 流程

21、BulkLoad使用场景:用户数据位于HDFS中,业务需要定期将这部分海量数据导入HBase系统

22、核心流程分两步

221、HFile生成阶段:运行一个MapReduce任务,map需要自己实现,将HDFS文件中的数据读取出来组装一个复合KV,其中Key是rowkey,Value可以是KeyValue对象、Put对象甚至Delete对象;reduce由HBase负责,他会根据表信息配置一个全局有序的partitioner,将partitioner文件上传到HDFS集群,设置reduce task个数为目标表的Region个数。为每个Region生成一个对应的HFile文件

222、HFile导入阶段:HFile主备就绪后,将HFile加载到在线集群。

23、Bulkload遇到的一些常见问题

231、设置正确的权限

231、BulkLoad *** 作过程涉及到的用户:

第一步,通过MapReduce任务生成HFile。假设这个过程使用的HDFS账号为:u_mapreduce

第二步,将HFile加载到HBase集群,假设这个步骤使用的账号为:u_load。

一般地:HBase集群由一个专门的账号用来管理HBase数据,该账号拥有HBase集群的所有表的最高权限,

同时可以读写HBase root目录下的所有文件,假设这个账号为:hbase_srv

232、权限设置

2321、通过MapReduce任务生成HFile,HFile文件的owner为u_mapreduce。

2322、u_load需要HFile文件以及目录的读、写权限。写的权限是因为在HFile跨越多个Region时,需要对HFile进行split *** 作。

另外u_load账号需要HBase表的Create权限

2323、hbase_srv账号把HFile文件从用户的数据目录rename到HBase的数据目录,所以hbase_sHrv需要有用户数据目录及HFile的读取

权限,但事实上仅读取权限还不够,应为加载到HBase数据目录的HFile目录的owner仍为u_mapreduce。一旦执行完compaction *** 作

之后,这些文件无法挪动到archive目录,导致文件越来越多。这个问题在HBase 2x 上修复。

232、影响Locality

如果生成HFile都在的HDFS集群和HBase所在HDFS集群时同一个,则MapReduce生成HFile,能够保证HFile与目标Region落在同一个机器上。这样就保证了Locality。由hbasebulkloadlocalitysensitiveenabled的参数控制整个逻辑,默认是true所以默认保证locality的。

如果用户MapReduce在A集群上生成HFile,通过distcp拷贝到集群B这样BulkLoad到HBase集群数据是没法保证Locality的。需要跑完BulkLoad之后再手动执行major compact,来提升loaclity。

233、BulkLoad数据复制

在13之前版本中,BulkLoad到HBase集群的数据并不会复制到备集群,这样可能无意识的导致备集群比主集群少了很多数据。在HBase13版本之后开始支持BulkLoad数据复制。需要开启开关:hbasereplicatitionbulkloadenabled=true。

HBase中,表会被划分为1…n个Region,被托管在RegionServer中。Region二个重要的属性:StartKey与 EndKey表示这个Region维护的rowKey范围,当我们要读/写数据时,如果rowKey落在某个start-end key范围内,那么就会定位到目标region并且读/写到相关的数据。
默认地,当我们只是通过HBaseAdmin指定TableDescriptor来创建一张表时,start-end key无边界,region的size越来越大时,大到 一定的阀值,就会找到一个midKey将region一分为二,成为2个region,这个过 程称为分裂(region-split)而midKey则为这二个region的临界

1总是往最大start-key的region写记录,之前分裂出来的region不会再被写数据,它们都处于半满状态
2split是比较耗时耗资源

合理设计rowkey 能让各个region 的并发请求 平均分配(趋于均匀) 使IO 效率达到最高
(预分区需要将hbasehregionmaxfilesize设置一个较大的值,默认是10G(0943 ) 也就是说单个region 默认大小是10G)

shell
指明分割点

HexStringSplit指明分割策略,-c 10指明要分割的区域数量,-f指明表中的列族,用“:”分割。

根据文件创建分区并压缩

COMPRESSION 默认值NONE,即不使用压缩
建议采用SNAPPY压缩

官方文档给出的建表提示

TTL 默认是 2147483647 即:IntegerMAX_VALUE 值 大概是68年,这个参数是说明该列族数据的 存活时间,单位是s,超过存过时间的数据将在表中不在显示,待下次major compact的时候再彻底删除数据,
若设置MIN_VERSIONS=>’0’ TTL时间戳过期后,将全部彻底删除该family 下所有的数据,如果MIN_VERSIONS 不等于0 那将保留最新的MIN_VERSIONS个版本的数据,其它的全部删除,比如MIN_VERSIONS=>’1’ 届时将保留一个最新版本的数据,其它版本的数据将不再保存。

VERSIONS 默认是3,这个参数的意思是数据保留三个 版本,如果数据随时都在更新,或老版本的数据无价值,那将此参数设为1 能节约2/3的空间
RegionSplitter提供三个用于预分割的工具:HexStringSplit、SplitAlgorithm、UniformSplit。
HexStringSplit和UniformSplit是两个预定义的静态类,可以直接使用;而SplitAlgorithm是一个接口,需要开发人员自己实现相应的分隔策略。如果是以十六进制字符串作为行键rowkey或者行键rowkey的前缀是十六进制字符串,用HexStringSplit就比较合适;UniformSplit会把行键均匀地分割多个部分,如果行将rowkey是随机的字节数组,用UniformSplit就比较合适;或者开发者根据需要实现分割策略。

原文: >hbase java是什么,让我们一起了解一下?

HBase是一个分布式的、面向列的开源数据库,具有高可靠性、高性能、面向列、可伸缩的分布式存储系统,利用HBase技术可在廉价PC Server上搭建起大规模结构化存储集群。
如何使用JAVA语言 *** 作Hbase、整合Hbase?

可分为五步骤:
步骤1:新创建一个Java Project 。

步骤2:导入JAR包,在工程根目录下新建一个“lib”文件夹,将官方文档中的lib目录下的jar全部导入。

步骤3:修改开发机的hosts文件,在文件莫为增加一行虚拟机IP的映射信息。

步骤4:修改虚拟机的配置文件,修改虚拟机的设备名称,名称需要与之前两个配置文件的映射名称一致。

步骤5:实现查询、新建、删除等。
案例代码展示如下:
package hbase; import javaioIOException; import javautilArrayList; import javautilList; import orgapachehadoopconfConfiguration; import orgapachehadoophbaseCell; import orgapachehadoophbaseHBaseConfiguration; import orgapachehadoophbaseHColumnDescriptor; import orgapachehadoophbaseHTableDescriptor; import orgapachehadoophbaseTableName; import orgapachehadoophbaseclientAdmin; import orgapachehadoophbaseclientConnection; import orgapachehadoophbaseclientConnectionFactory; import orgapachehadoophbaseclientDelete; import orgapachehadoophbaseclientGet; import orgapachehadoophbaseclientPut; import orgapachehadoophbaseclientResult; import orgapachehadoophbaseclientResultScanner; import orgapachehadoophbaseclientScan; import orgapachehadoophbaseclientTable; import orgapachehadoophbaseexceptionsDeserializationException; import orgapachehadoophbasefilterFilter; import orgapachehadoophbasefilterSingleColumnValueFilter; import orgapachehadoophbasefilterCompareFilterCompareOp; import orgapachehadoophbaseutilBytes; import orgjunitBefore; import orgjunitTest; public class HBaseDemo { // 与HBase数据库的连接对象 Connection connection; // 数据库元数据 *** 作对象 Admin admin; @Before public void setUp() throws Exception { // 取得一个数据库连接的配置参数对象 Configuration conf = HBaseConfigurationcreate(); // 设置连接参数:HBase数据库所在的主机IP confset("hbasezookeeperquorum", "19216813713"); // 设置连接参数:HBase数据库使用的端口 confset("hbasezookeeperpropertyclientPort", "2181"); // 取得一个数据库连接对象 connection = ConnectionFactorycreateConnection(conf); // 取得一个数据库元数据 *** 作对象 admin = connectiongetAdmin(); } /      创建表     / public void createTable() throws IOException{ Systemoutprintln("---------------创建表 START-----------------"); // 数据表表名 String tableNameString = "t_book"; // 新建一个数据表表名对象 TableName tableName = TableNamevalueOf(tableNameString); // 如果需要新建的表已经存在 if(admintableExists(tableName)){ Systemoutprintln("表已经存在!"); } // 如果需要新建的表不存在 else{ // 数据表描述对象 HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName); // 列族描述对象 HColumnDescriptor family= new HColumnDescriptor("base");; // 在数据表中新建一个列族 hTableDescriptoraddFamily(family); // 新建数据表 admincreateTable(hTableDescriptor); } Systemoutprintln("---------------创建表 END-----------------"); } /      查询整表数据     / @Test public void queryTable() throws IOException{ Systemoutprintln("---------------查询整表数据 START-----------------"); // 取得数据表对象 Table table = connectiongetTable(TableNamevalueOf("t_book")); // 取得表中所有数据 ResultScanner scanner = tablegetScanner(new Scan()); // 循环输出表中的数据 for (Result result : scanner) { byte[] row = resultgetRow(); Systemoutprintln("row key is:" + new String(row)); List  listCells = resultlistCells(); for (Cell cell : listCells) { byte[] familyArray = cellgetFamilyArray(); byte[] qualifierArray = cellgetQualifierArray(); byte[] valueArray = cellgetValueArray(); Systemoutprintln("row value is:" + new String(familyArray) + new String(qualifierArray) + new String(valueArray)); } } Systemoutprintln("---------------查询整表数据 END-----------------"); } /      按行键查询表数据     / @Test public void queryTableByRowKey() throws IOException{ Systemoutprintln("---------------按行键查询表数据 START-----------------"); // 取得数据表对象 Table table = connectiongetTable(TableNamevalueOf("t_book")); // 新建一个查询对象作为查询条件 Get get = new Get("row8"getBytes()); // 按行键查询数据 Result result = tableget(get); byte[] row = resultgetRow(); Systemoutprintln("row key is:" + new String(row)); List  listCells = resultlistCells(); for (Cell cell : listCells) { byte[] familyArray = cellgetFamilyArray(); byte[] qualifierArray = cellgetQualifierArray(); byte[] valueArray = cellgetValueArray(); Systemoutprintln("row value is:" + new String(familyArray) + new String(qualifierArray) + new String(valueArray)); } Systemoutprintln("---------------按行键查询表数据 END-----------------"); } /      按条件查询表数据     / @Test public void queryTableByCondition() throws IOException{ Systemoutprintln("---------------按条件查询表数据 START-----------------"); // 取得数据表对象 Table table = connectiongetTable(TableNamevalueOf("t_book")); // 创建一个查询过滤器 Filter filter = new SingleColumnValueFilter(BytestoBytes("base"), BytestoBytes("name"), CompareOpEQUAL, BytestoBytes("bookName6")); // 创建一个数据表扫描器 Scan scan = new Scan(); // 将查询过滤器加入到数据表扫描器对象 scansetFilter(filter); // 执行查询 *** 作,并取得查询结果 ResultScanner scanner = tablegetScanner(scan); // 循环输出查询结果 for (Result result : scanner) { byte[] row = resultgetRow(); Systemoutprintln("row key is:" + new String(row)); List  listCells = resultlistCells(); for (Cell cell : listCells) { byte[] familyArray = cellgetFamilyArray(); byte[] qualifierArray = cellgetQualifierArray(); byte[] valueArray = cellgetValueArray(); Systemoutprintln("row value is:" + new String(familyArray) + new String(qualifierArray) + new String(valueArray)); } } Systemoutprintln("---------------按条件查询表数据 END-----------------"); } /      清空表     / @Test public void truncateTable() throws IOException{ Systemoutprintln("---------------清空表 START-----------------"); // 取得目标数据表的表名对象 TableName tableName = TableNamevalueOf("t_book"); // 设置表状态为无效 admindisableTable(tableName); // 清空指定表的数据 admintruncateTable(tableName, true); Systemoutprintln("---------------清空表 End-----------------"); } /      删除表     / @Test public void deleteTable() throws IOException{ Systemoutprintln("---------------删除表 START-----------------"); // 设置表状态为无效 admindisableTable(TableNamevalueOf("t_book")); // 删除指定的数据表 admindeleteTable(TableNamevalueOf("t_book")); Systemoutprintln("---------------删除表 End-----------------"); } /      删除行     / @Test public void deleteByRowKey() throws IOException{ Systemoutprintln("---------------删除行 START-----------------"); // 取得待 *** 作的数据表对象 Table table = connectiongetTable(TableNamevalueOf("t_book")); // 创建删除条件对象 Delete delete = new Delete(BytestoBytes("row2")); // 执行删除 *** 作 tabledelete(delete); Systemoutprintln("---------------删除行 End-----------------"); } /      删除行(按条件)     / @Test public void deleteByCondition() throws IOException, DeserializationException{ Systemoutprintln("---------------删除行(按条件) START-----------------"); // 步骤1:调用queryTableByCondition()方法取得需要删除的数据列表 // 步骤2:循环步骤1的查询结果,对每个结果调用deleteByRowKey()方法 Systemoutprintln("---------------删除行(按条件) End-----------------"); } /      新建列族     / @Test public void addColumnFamily() throws IOException{ Systemoutprintln("---------------新建列族 START-----------------"); // 取得目标数据表的表名对象 TableName tableName = TableNamevalueOf("t_book"); // 创建列族对象 HColumnDescriptor columnDescriptor = new HColumnDescriptor("more"); // 将新创建的列族添加到指定的数据表 adminaddColumn(tableName, columnDescriptor); Systemoutprintln("---------------新建列族 END-----------------"); } /      删除列族     / @Test public void deleteColumnFamily() throws IOException{ Systemoutprintln("---------------删除列族 START-----------------"); // 取得目标数据表的表名对象 TableName tableName = TableNamevalueOf("t_book"); // 删除指定数据表中的指定列族 admindeleteColumn(tableName, "more"getBytes()); Systemoutprintln("---------------删除列族 END-----------------"); } /      插入数据     / @Test public void insert() throws IOException{ Systemoutprintln("---------------插入数据 START-----------------"); // 取得一个数据表对象 Table table = connectiongetTable(TableNamevalueOf("t_book")); // 需要插入数据库的数据集合 List  putList = new ArrayList (); Put put; // 生成数据集合 for(int i = 0; i 


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/yw/13409538.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-07-30
下一篇 2023-07-30

发表评论

登录后才能评论

评论列表(0条)

保存