六、HBase写入流程

六、HBase写入流程,第1张

1、HBase写入流程

HBase服务端没有提供update,delete接口,HBase中对数据的更新、删除 *** 作都认为是写入 *** 作,更新 *** 作会写入一个最小版本数据,删除 *** 作写写入一条标记为deleted的KV数据

1.1、写入流程三个阶段概况

1)客户端处理阶段:客户端将用户请求进行预处理,并根据集群元数据定位写入数据所在的RegionServer,将请求发送给RS

2)Region写入阶段:RS收到请求之后解析数据,首先把数据写入WAL,再写入对应Region对应的MemStore

3)MemStore Flush阶段:当Region中MemStore容量达到一定阈值之后,系统异步执行flush *** 作,将内存写入文件,形成HFile

1.2、用户写入请求在完成写入MemStore之后就会返回成功。MemStore Flush是一个异步执行的过程。

1.3、客户端处理阶段步骤详解:

1)客户端可以设置批量提交,如果设置了批量提交(autoflush=false)客户端会先将数据写入本地缓冲区等达到一定阈值之后才会提交。否则put请求直接会提交给服务端进行处理。

2)RS寻址,在提交之前HBase会在元数据表hbase:meta中根据rowkey找到她们归属的RS

2.1)客户端根据写入的表和rowkey在元数据中查找,如果能够查找出该rowkey所在的RS及Region,就直接发送写入请求

2.2)如果客户端没有找到rowkey信息,需要首先到zk上找到hbase:meta表所在的RS,向那RS发送查询请求获取元数据,然后在元数据中查找rowkey所在的RS,并将元数据缓存在本地,以备下次使用。

3)客户端发送远程RPC请求给RS,将数据写入目标Region的MemStore中

1.4、Region写入阶段步骤详解:

1)获取行锁,HBase中使用行锁保证对同一行数据的更新是互斥 *** 作,用以保证更新的原子性,要么成功要么失败

2)更新所有待写入keyValue的时间戳为当前系统时间

3)对一次写入同一个Region的一个或多个KeyValue构建一条WALEdit记录,这样做的目的是保证Region级别事务的写入原子性

4)把WALEdit写入HLog,HLog是存储在HDFS上需要sync *** 作把HLog真正落地到HDFS,在这一部暂时不用执行sync,HBase使用了disruptor实现了高效的生产者消费者队列,来异步实现WAL的追加写入 *** 纵

5)写入WAL之后再将数据写入MemStore

6)释放行锁

7)sync WAL:将HLog真正sync到HDFS,如果sync失败,执行回滚 *** 作将MemStore数据移除

8)结束写事务。更新对外可见,更新生效

1.5、MemStore Flush阶段详解:

1.5.1、触发flush条件

1.5.1.1、MemStore级别限制,当Rgion中任意一个MemStore大小达到阈值(hbase.hrgion.memstore.flush.size)默认128M

1.5.1.2、Region级别限制:当Region所有MemStore的大小达到了上限(hbase.hregion.memstore.block.multiplier * hbase.hrgion.memstore.flush.size)超过memstore大小的倍数达到该值则阻塞所有写入请求进行flush,自我保护默认是2.

1.5.1.3、RegionServer级别限制:当RS中MemStore的总大小超过低水位阈值hbase.regionserver.global.memstore.size.lower.limit * hbase.reagionserver.global.memstore.size RS则开始强制执行flush,按Region中MemStore大小从大到小进行flush,直到总MemStore大小下降到低水位。

1.5.1.4、当一个RegionServer中HLog数量达到一定上限(hbase.regionserver.maxlogs),系统选择最早的HLog对应的Rgion进行Flush

1.5.1.5、HBase定期Flush,默认是1小时确保MemStore不会长时间没有持久化。为了避免同一时间所有都进行flush,定期的flush *** 作有一定时间的随机延迟

1.5.1.6、手动flush,用户可以通过flush 'tablename'或者 flush 'regionname'对一个表或者Region进行flush

1.5.2、flush执行步骤

1.5.2.1、prepare阶段

遍历当前region下的MemStore做一个快照,然后新一个ConcurrentSkipListMap接受新的数据请求。此阶段需要通过锁来阻塞写请求,结束后释放锁,此过程持锁时间很短

1.5.2.2、flush阶段

对快照数据按照特定格式生成HFile持久化为临时文件放在.tmp目录下。这个过程涉及到磁盘IO *** 作,相对比较耗时

1.5.2.3、commit阶段

把临时文件移动到指定的CF目录下。再清空快照数据。

1.5.3、MemStore Flush对业务的影响

1.5.3.1、大部分MemStore Flush *** 作都不会对业务读写产生太大影响,

1.5.3.2、Region Server级别呆滞的flush,会对用户请求产生较大影响,会阻塞落在该RS上的写入 *** 作。

1.6、HLog写入模型

1.6.1、HLog持久化级别

SKIP_WAL:只写缓存,不写HLog,不可取

ASYNC_WAL:异步写入HLog

SYNC_WAL:同步写入日志文件,数据只是被写入文件系统缓存中并没有真正落盘。默认是此级别

FSYNC_WAL:同步将数据写入日志文件并强制落盘,这是最严格的写入级别,保证数据不丢失,性能相对较差

USER_DEFAULT:如果用户没有指定持久化级别,默认HBase使用SYN_WAL等级持久化数据put.setDurability(Durability.SYNC_WAL)

1.6.2、HLog写入模型

1、HLog写入需要经过3个阶段:手写将数据写入本地缓存,然后将本地缓存写入文件系统,最后执行syn *** 作同步到磁盘

2、HBase使用LMAX Disruptor框架实现了无锁有界队列 *** 作,写入模型如下图

2、BulkLoad 流程

2.1、BulkLoad使用场景:用户数据位于HDFS中,业务需要定期将这部分海量数据导入HBase系统.

2.2、核心流程分两步

2.2.1、HFile生成阶段:运行一个MapReduce任务,map需要自己实现,将HDFS文件中的数据读取出来组装一个复合KV,其中Key是rowkey,Value可以是KeyValue对象、Put对象甚至Delete对象;reduce由HBase负责,他会根据表信息配置一个全局有序的partitioner,将partitioner文件上传到HDFS集群,设置reduce task个数为目标表的Region个数。为每个Region生成一个对应的HFile文件

2.2.2、HFile导入阶段:HFile主备就绪后,将HFile加载到在线集群。

2.3、Bulkload遇到的一些常见问题

2.3.1、设置正确的权限

2.3.1、BulkLoad *** 作过程涉及到的用户:

第一步,通过MapReduce任务生成HFile。假设这个过程使用的HDFS账号为:u_mapreduce.

第二步,将HFile加载到HBase集群,假设这个步骤使用的账号为:u_load。

一般地:HBase集群由一个专门的账号用来管理HBase数据,该账号拥有HBase集群的所有表的最高权限,

同时可以读写HBase root目录下的所有文件,假设这个账号为:hbase_srv

2.3.2、权限设置

2.3.2.1、通过MapReduce任务生成HFile,HFile文件的owner为u_mapreduce。

2.3.2.2、u_load需要HFile文件以及目录的读、写权限。写的权限是因为在HFile跨越多个Region时,需要对HFile进行split *** 作。

另外u_load账号需要HBase表的Create权限

2.3.2.3、hbase_srv账号把HFile文件从用户的数据目录rename到HBase的数据目录,所以hbase_sHrv需要有用户数据目录及HFile的读取

权限,但事实上仅读取权限还不够,应为加载到HBase数据目录的HFile目录的owner仍为u_mapreduce。一旦执行完compaction *** 作

之后,这些文件无法挪动到archive目录,导致文件越来越多。这个问题在HBase 2.x 上修复。

2.3.2、影响Locality

如果生成HFile都在的HDFS集群和HBase所在HDFS集群时同一个,则MapReduce生成HFile,能够保证HFile与目标Region落在同一个机器上。这样就保证了Locality。由hbase.bulkload.locality.sensitive.enabled的参数控制整个逻辑,默认是true.所以默认保证locality的。

如果用户MapReduce在A集群上生成HFile,通过distcp拷贝到集群B.这样BulkLoad到HBase集群数据是没法保证Locality的。需要跑完BulkLoad之后再手动执行major compact,来提升loaclity。

2.3.3、BulkLoad数据复制

在1.3之前版本中,BulkLoad到HBase集群的数据并不会复制到备集群,这样可能无意识的导致备集群比主集群少了很多数据。在HBase1.3版本之后开始支持BulkLoad数据复制。需要开启开关:hbase.replicatition.bulkload.enabled=true。

方法1:最基本的数据导入方法。首先通过JDBC将原本关系型数据库中的数据读出到内存中,然后在使用HBase自带的客户端API将数据put到相应的表中。这种方法通用性强,只要写好接口就可以用,但是效率并不高。

方法2:使用这种方法之前其实是需要先将数据导出到本地,以文本的形式保存,然后使用TableReudcer类编写MapReduce job。这种方法需要频繁的I/O *** 作,所以效率不高,容易导致HBase节点的不稳定。

方法3:importtsv是HBase内置的数据导入工具,目的是将tsv格式的文件加载到HBase中,本质上它是通过调用MapReudce Job实现数据导入的。注意:使用该方法,需要提前将数据导出到本地,以tsv格式存储。unbulk load模式的importtsv效果一般,适用于小型的数据。

方法4:bulk load是一个快速大量数据高效导入工具,相比于importtsv效率更高。

方法5:Sqoop是apache软件基金会的一个项目,可以用来实现关系型数据库和hdfs,hbase,hive之间的数据高效传输。只需要做一些简单的配置,通过Sqoop命令行指令就可以方便的实现数据导入和导出。

下面具体介绍每种方法的做法:

JDBC &HBase Client API

此处以MySql为例。首先在MySql数据库中创建database ‘test’,然后创建一张表’Info’,这里可以使用可视化软件(例如workbench),也可以直接在命令行输入相应指令:

:~$mysql -u root -p #root用户登录mysql

#创建Info表

CREATE TABLE Info (`ID` INT NOT NULL,

`Name` VARCHAR(45) NOT NULL,

`Number` INT NOT NULL,

`Time` VARCHAR(45) NOT NULL,

PRIMARY KEY (`ID`))

1

2

3

4

5

6

7

1

2

3

4

5

6

7

然后使用load指令将准备好的数据导入到Info中。数据格式与Info各字段的一致即可。

load data local infile '/home/lvyang/Desktop/test.csv' into table Info fields terminated by ','

1

1

到此数据已经准备好了。下面就可以进行数据导出导入过程了。

由于需要使用MySql的数据读取接口,所以我们需要到官网下载相应的connector,并将其中包含的mysql-connector-java-版本号-bin.jar文件取出,添加到自己Project的依赖库中。如果对maven比较熟的,就可以忽视这些配置过程,直接配置pom.xml文件即可完成项目依赖设置,方便快捷。

JDBC数据读取:

public class JDBCUtils {

Connection conn=null

ResultSet rs=null

String databaseName=null

String userName=null

String password=null

String url=null

public JDBCUtils(String databaseName, String userName, String password, String url)

public void connect()

public ResultSet readData(String sql)

public void writeToConsole(ResultSet rs,String[] keys)

public boolean writeToLocal(ResultSet rs,String path)

public void close()

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

1

2

3

4

5

6

7

8

9

10

11

12

13

14

上面是JDBC工具类定义,可以根据自己的需求,自行添加或者删除方法。部分方法的实现如下,仅做参考:

public void connect(){

try {

Class.forName("com.mysql.jdbc.Driver") //注册驱动

System.out.println("load mysql driver successfully!")

conn= (Connection) DriverManager.getConnection(url)//获得connection对象,完成数据库连接

} catch (ClassNotFoundException e) {

e.printStackTrace()

} catch (SQLException e) {

e.printStackTrace()

}

}

public ResultSet readData(String sql){

try {

Statement stmt= (Statement) conn.createStatement()//创建statement对象

rs=stmt.executeQuery(sql) //执行query命令,获取ResultSet

} catch (SQLException e) {

e.printStackTrace()

}

return rs

}

public void close(){

if(rs!=null){

try {

rs.close()

} catch (SQLException e) {

e.printStackTrace()

}

}

if(conn!=null){

try {

conn.close()

} catch (SQLException e) {

e.printStackTrace()

}

}

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

到此JDBC读取数据部分已经完成,下面需要实现HBase数据导入功能:

public class HBaseUtils {

private static final Log LOG= LogFactory.getLog(HBaseUtils.class)//LOG用于输出部分关键信息

//Here I choose construct func to init configuration instance

//and then use connectionFactory to create init conn instance

//at last,I use conn to get Hadmin instance

//next I will use Hadmin to operate hbase tables

private Configuration conf=null

private Admin Hadmin=null

private Connection conn=null

public HBaseUtils(Configuration conf)

public void connect()

public boolean isExist(String tableName)

public boolean createTable(String tableName,String columnFamily)

public boolean writeOne(HashMap<String,String>data,String[] keys, String tableName,String columnFamily)

public boolean writeMore(List<HashMap<String,String>>list,String[] keys,String tableName,String columnFamily)

public boolean deleteTable(String tableName)

public void close()

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

部分方法的实现如下,仅做参考:

public void connect(){

try {

//create connection to hbase

conn= ConnectionFactory.createConnection(conf)

//get Hadmin which is the database manager

Hadmin=conn.getAdmin()

} catch (IOException e) {

e.printStackTrace()

if(conn!=null){

try {

conn.close()

} catch (IOException e1) {

e1.printStackTrace()

}

}

if(Hadmin!=null){

try {

Hadmin.close()

} catch (IOException e1) {

e1.printStackTrace()

}

}

}

}

public boolean createTable(String tableName,String columnFamily){

/**

* create table

* here I limit the number of column family to 1

* So here only can create one column family's table

* **/

TableName table_name= TableName.valueOf(tableName)

LOG.info("Create table:"+tableName+" now!")

HTableDescriptor tableDesc=new HTableDescriptor(table_name)

HColumnDescriptor columnDesc=new HColumnDescriptor(columnFamily)

tableDesc.addFamily(columnDesc)

try {

this.Hadmin.createTable(tableDesc)

} catch (IOException e) {

e.printStackTrace()

return false

}

return true

}

public boolean writeOne(HashMap<String,String>data,String[] keys, String tableName,String columnFamily){

/**

* write one data to table at one time

* here I choose the first key as the rowKey,because I think the first key usually is the primary key

* **/

boolean flag=false

TableName table_name=TableName.valueOf(tableName)

byte[] column_family=columnFamily.getBytes()

try {

Table table=conn.getTable(table_name)

byte[] rowKey=data.get(keys[0]).toString().getBytes()//construct HBase table's rowKey

Put put=new Put(rowKey)

for(int i=0i<keys.lengthi++){

byte[] key=keys[i].getBytes()

byte[] value=data.get(keys[i]).toString().getBytes()

put.addColumn(column_family,key,value)

}

table.put(put)

table.close()

flag=true

} catch (IOException e) {

e.printStackTrace()

}

return flag

}

public void close(){

/**

* close connection

* **/

LOG.info("Close connection to HBase!")

if(Hadmin!=null){

try {

Hadmin.close()

} catch (IOException e) {

e.printStackTrace()

}

}

if(conn!=null){

try {

conn.close()

} catch (IOException e) {

e.printStackTrace()

}

}

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

到此,工具类已经基本完成,下面需要写一个主类:

public class deMain {

public static void main(String[] args){

//JDBC Init

Connection conn=null

String sql="select * from Info"

String databaseName="test"

String userName="root"

String password="****"

String url="jdbc:mysql://localhost:3306/"+databaseName+"?user="+userName+"&password="

+password+"&useUnicode=true&characterEncoding=utf-8"

//HBase Client Init

String tableName="test"

String columnFamily="info"

Configuration conf= HBaseConfiguration.create()

conf.set("hbase.zookeeper.quorum","127.0.0.1")

conf.set("hbase.master","localhost:9000")

}

//JDBC connection and read data

JDBCUtils ju=new JDBCUtils(databaseName,userName,password,url)

ju.connect()

ResultSet rs=ju.readData(sql)

//HBase connect

HBaseUtils hbu=new HBaseUtils(conf)

hbu.connect()

//依次读取rs中每条记录,并将其写入HBase相应表中即可

.........

//close all connection

hbu.close()

ju.close()

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

到此,大功告成!由于是通过IDE端运行hadoop程序,所以我们需要将需要用到的依赖库导入,而这个过程如果不借助maven的话,就会特别的痛苦。下图是我配置的项目依赖包,仅供参考。

这里写图片描述

注:hadoop2.7.2,hbase1.2.1,zookeeper3.4.6


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/sjk/6729320.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-03-27
下一篇 2023-03-27

发表评论

登录后才能评论

评论列表(0条)

保存