用户画像-1: 大批量数据导入HBase

用户画像-1: 大批量数据导入HBase,第1张

用户画像-1: 大批量数据导入HBase

数据导入Hbase中常用的有三种方式:sqoop, Hbase importTsv, Hbase Bulkload,这三种方式,各有优缺点,下面将逐一介绍这三种方案的优缺点.

1. Sqoop直接导入

可以使用 SQOOP 将 MySQL 表的数据导入到 Hbase 表中,指定 表的名称、列簇及 RowKey ,范 例如下所示: 参数含义解释:

知识拓展:如何使用SQOOP进行增量导入数据至Hbase表,范例命令如下:

例一:

/export/servers/sqoop/bin/sqoop import 
-D sqoop.hbase.add.row.key=true 
--connect jdbc:mysql://bigdata-cdh01.itcast.cn:3306/tags_dat 
--username root 
--password 123456 
--table tbl_users 
--hbase-create-table 
--hbase-table tbl_users 
--column-family detail 
--hbase-row-key id 
--num-mappers 2
1 、 -D sqoop.hbase.add.row.key=true 是否将 rowkey 相关字段写入列族中,默认为 false ,默认情况下你将在列族中看不到任何 row key 中的字段。注意,该参数必须放在 import 之后。 2 、 --hbase-create-table 如果 hbase 中该表不存在则创建 3 、 --hbase-table 对应的 hbase 表名 4 、 --hbase-row-key hbase 表中的 rowkey, 注意格式 5 、 --column-family hbase 表的列族 例二:
/export/servers/sqoop/bin/sqoop import 
-D sqoop.hbase.add.row.key=true 
--connect jdbc:mysql://bigdata-cdh01.itcast.cn:3306/tags_dat 
--username root 
--password 123456 
--table tbl_logs 
--hbase-create-table 
--hbase-table tag_logs 
--column-family detail 
--hbase-row-key id 
--num-mappers 20 
--incremental lastmodified 
--check-column log_time 

相关增量导入参数说明:

使用 SQOOP 导入数据到 Hbase 表中,有一个限制: 需要指定 RDBMs 表中的某个字段作为 Hbase 表的 ROWKEY ,如果 Hbase 表的 ROWKEY 为多 个字段组合,就无法指定,所以此种方式有时候不能使用。 2. Hbase importTSV importTSV 功能描述:
将tsv(也可以是csv,每行数据中各个字段使用分隔符分割)格式文本数据,加载到Hbase表中。
1)、采用Put方式加载导入
2)、采用BulkLoad方式批量加载导入
使用如下命令,查看 Hbase 官方自带工具类使用说明:
HADOOP_HOME=/export/servers/hadoop 
Hbase_HOME=/export/servers/hbase 
HADOOP_CLASSPATH=`${Hbase_HOME}/bin/hbase mapredcp`:${Hbase_HOME}/conf ${HADOOP_HOME}/bin/yarn jar ${Hbase_HOME}/lib/hbase-server-1.2.0- cdh5.14.0.jar
执行上述命令提示如下信息: 其中 importtsv 就是将文本文件(比如 CSV 、 TSV 等格式)数据导入 Hbase 表工具类,使用 说明如下:
An example program must be given as the first argument.
Valid program names are:
CellCounter: Count cells in Hbase table.
WALPlayer: Replay WAL files.
completebulkload: Complete a bulk data load.
copytable: Export a table from local cluster to peer cluster.
export: Write table data to HDFS.
exportsnapshot: Export the specific snapshot to a given FileSystem.
import: import data written by Export.
importtsv: import data in TSV format.
rowcounter: Count rows in Hbase table.
verifyrep: Compare the data from tables in two different clusters.
Usage: importtsv -Dimporttsv.columns=a,b,c  
The column names of the TSV data must be specified using the -
Dimporttsv.columns
option. This option takes the form of comma-separated column names, where
each
column name is either a simple column family, or a columnfamily:qualifier.
The special column name Hbase_ROW_KEY is used to designate that this column
should be used as the row key for each imported record.
To instead generate HFiles of data to prepare for a bulk data load, pass
the option:
-Dimporttsv.bulk.output=/path/for/output
'-Dimporttsv.separator=|' - eg separate on pipes instead of tabs
For performance consider the following options:
-Dmapreduce.map.speculative=false
-Dmapreduce.reduce.speculative=false
分别演示采用直接 Put 方式和 HFile 文件方式将数据导入 Hbase 表,命令如下: 2.1 直接导入Put方式
HADOOP_HOME=/export/servers/hadoop
Hbase_HOME=/export/servers/hbase
HADOOP_CLASSPATH=`${Hbase_HOME}/bin/hbase mapredcp`:${Hbase_HOME}/conf
${HADOOP_HOME}/bin/yarn jar ${Hbase_HOME}/lib/hbase-server-1.2.0-
cdh5.14.0.jar 
importtsv 
-Dimporttsv.columns=Hbase_ROW_KEY,detail:log_id,detail:remote_ip,detail:s
ite_global_ticket,detail:site_global_session,detail:global_user_id,detai
l:cookie_text,detail:user_agent,detail:ref_url,detail:loc_url,detail:log
_time 
tbl_logs 
/user/hive/warehouse/tags_dat.db/tbl_logs
上述命令本质上运行一个 MapReduce 应用程序,将文本文件中每行数据转换封装到 Put 对象,然后插入到 Hbase 表中。
回顾一下:
采用Put方式向Hbase表中插入数据流程:
Put
-> WAL 预写日志
-> MemStore(内存) ,当达到一定大写Spill到磁盘上:
StoreFile(
HFile)
思考:
对海量数据插入,能否将数据直接保存为HFile文件,然后加载到Hbase表中
2.2 转换为HFile文件,再加载至表
# 1. 生成HFILES文件
HADOOP_HOME=/export/servers/hadoop
Hbase_HOME=/export/servers/hbase
HADOOP_CLASSPATH=`${Hbase_HOME}/bin/hbase mapredcp`:${Hbase_HOME}/conf
${HADOOP_HOME}/bin/yarn jar ${Hbase_HOME}/lib/hbase-server-1.2.0-
cdh5.14.0.jar 
importtsv 
-Dimporttsv.bulk.output=hdfs://bigdata-
cdh01.itcast.cn:8020/datas/output_hfile/tbl_logs 
-
Dimporttsv.columns=Hbase_ROW_KEY,detail:log_id,detail:remote_ip,detail:
site_global_ticket,detail:site_global_session,detail:global_user_id,det
ail:cookie_text,detail:user_agent,detail:ref_url,detail:loc_url,detail:
log_time 

tbl_logs  
/user/hive/warehouse/tags_dat.db/tbl_logs
 # 2. 将HFILE文件加载到表中 
HADOOP_CLASSPATH=`${Hbase_HOME}/bin/hbase mapredcp`:${Hbase_HOME}/conf ${HADOOP_HOME}/bin/yarn jar ${Hbase_HOME}/lib/hbase-server-1.2.0- cdh5.14.0.jar  completebulkload  
hdfs://bigdata-cdh01.itcast.cn:8020/datas/output_hfile/tbl_logs  tbl_logs
缺点: 1 )、 ROWKEY 不能是组合主键 只能是某一个字段 2 )、当表中列很多时,书写 -Dimporttsv.columns 值时很麻烦,容易出错 3. Hbase Bulkload 在大量数据需要写入 Hbase 时,通常有 put 方式和 bulkLoad 两种方式。 1 、 put 方式为单条插入,在 put 数据时会先将数据的更新 *** 作信息和数据信息 写入 WAL , 在写入到 WAL 后, 数据就会被放到 MemStore 中 ,当 MemStore 满后数据就会被 flush 到磁盘 ( 即形成 HFile 文件 ) ,在这种写 *** 作过程会涉及到 flush 、 split 、 compaction 等 *** 作,容易造 成节点不稳定,数据导入慢,耗费资源等问题,在海量数据的导入过程极大的消耗了系统 性能 ,避免这些问题最好的方法就是使用 BulkLoad 的方式来加载数据到 Hbase 中。
val put = new Put(rowKeyByts) 
put.addColumn(cf, column, value) 
put.addColumn(cf, column, value) 
put.addColumn(cf, column, value) 
put.addColumn(cf, column, value) 
table.put(put)

2 、 BulkLoader 利用 Hbase 数据 按照 HFile 格式存储在 HDFS 的原理,使用 MapReduce 直接批量 生成 HFile 格式文件后, RegionServers 再将 HFile 文件移动到相应的 Region 目录下 。

 

1)、Extract,异构数据源数据导入到 HDFS 之上。 
2)、Transform,通过用户代码,可以是 MR 或者 Spark 任务将数据转化为 HFile。 
3)、Load,HFile 通过 loadIncrementalHFiles 调用将 HFile 放置到 Region 对应的 HDFS 目录上,该过程可能涉及到文件切分。 
1、不会触发WAL预写日志,当表还没有数据时进行数据导入不会产生Flush和Split。 
2、减少接口调用的消耗,是一种快速写入的优化方式。
 Spark读写Hbase之使用Spark自带的API以及使用Bulk Load将大量数据导入Hbase: https://www.jianshu.com/p/b6c5a5ba30af

 

Bulkload 过程主要包括三部分: 1 )、 Extract ,异构数据源数据导入到 HDFS 之上。 2 )、 Transform ,通过用户代码,可以是 MR 或者 Spark 任务将数据转化为 HFile 。 3 )、 Load , HFile 通过 loadIncrementalHFiles 调用将 HFile 放置到 Region 对应的 HDFS 目录上,该过程可能涉及到文件切分。 1 、不会触发 WAL 预写日志,当表还没有数据时进行数据导入不会产生 Flush 和 Split 。 2 、减少接口调用的消耗,是一种快速写入的优化方式。 Spark 读写 Hbase 之使用 Spark 自带的 API 以及使用 Bulk Load 将大量数据导入 Hbase : https://www.jianshu.com/p/b6c5a5ba30af  
Bulkload过程主要包括三部分:
1、从数据源(通常是文本文件或其他的数据库)提取数据并上传到HDFS。
抽取数据到HDFS和Hbase并没有关系,所以大家可以选用自己擅长的方式进行。
2、利用MapReduce作业处理事先准备的数据 。
这一步需要一个MapReduce作业,并且大多数情况下还需要我们自己编写Map函数,而Reduce
函数不需要我们考虑,由Hbase提供。
该作业需要使用rowkey(行键)作为输出Key;KeyValue、Put或者Delete作为输出Value。
MapReduce作业需要使用HFileOutputFormat2来生成Hbase数据文件。
为了有效的导入数据,需要配置HFileOutputFormat2使得每一个输出文件都在一个合适的区
域中。为了达到这个目的,MapReduce作业会使用Hadoop的TotalOrderPartitioner类根据表的
key值将输出分割开来。
HFileOutputFormat2的方法configureIncrementalLoad()会自动的完成上面的工作。
3、告诉RegionServers数据的位置并导入数据。
这一步是最简单的,通常需要使用LoadIncrementalHFiles(更为人所熟知是
completebulkload工具),将文件在HDFS上的位置传递给它,它就会利用RegionServer将数据导
入到相应的区域。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5716386.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-18
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存