NewInstallUserRunner.java
计算新增用户入口类
main方法:
只有一个方法ToolRunner.run
入口类implements Tool接口
Tool定义run方法
Tool 继承Configurable
Configurable定义两个方法
所以入口类需要实现3个方法
setConf方法实现
定义output-collector的类,反射用的
输出到mysql的时候,组成insert语句的
输出到mysql的链接信息
getConf方法实现
run方法的实现
processArgs方法解读
run方法第一条this.processArgs(conf, args)执行结束,
返回结果默认是昨天,或者运行时加入-d yyyy-DD-mm 格式输入的日期。
参数中用到的枚举方法
addDependencyJars 如果在服务器运行,需要设置为true 如果在本地运行 设置成false
http://www.jianshu.com/p/fd25d036d4dc
http://www.jianshu.com/p/1493de43dbf7
if (job.waitForCompletion(true)) {
// 执行成功, 需要计算总用户
this.calculateTotalUsers(conf)
return 0
} else {
return -1
}
方法1:最基本的数据导入方法。首先通过JDBC将原本关系型数据库中的数据读出到内存中,然后在使用HBase自带的客户端API将数据put到相应的表中。这种方法通用性强,只要写好接口就可以用,但是效率并不高。方法2:使用这种方法之前其实是需要先将数据导出到本地,以文本的形式保存,然后使用TableReudcer类编写MapReduce job。这种方法需要频繁的I/O *** 作,所以效率不高,容易导致HBase节点的不稳定。
方法3:importtsv是HBase内置的数据导入工具,目的是将tsv格式的文件加载到HBase中,本质上它是通过调用MapReudce Job实现数据导入的。注意:使用该方法,需要提前将数据导出到本地,以tsv格式存储。unbulk load模式的importtsv效果一般,适用于小型的数据。
方法4:bulk load是一个快速大量数据高效导入工具,相比于importtsv效率更高。
方法5:Sqoop是apache软件基金会的一个项目,可以用来实现关系型数据库和hdfs,hbase,hive之间的数据高效传输。只需要做一些简单的配置,通过Sqoop命令行指令就可以方便的实现数据导入和导出。
下面具体介绍每种方法的做法:
JDBC &HBase Client API
此处以MySql为例。首先在MySql数据库中创建database ‘test’,然后创建一张表’Info’,这里可以使用可视化软件(例如workbench),也可以直接在命令行输入相应指令:
:~$mysql -u root -p #root用户登录mysql
#创建Info表
CREATE TABLE Info (`ID` INT NOT NULL,
`Name` VARCHAR(45) NOT NULL,
`Number` INT NOT NULL,
`Time` VARCHAR(45) NOT NULL,
PRIMARY KEY (`ID`))
1
2
3
4
5
6
7
1
2
3
4
5
6
7
然后使用load指令将准备好的数据导入到Info中。数据格式与Info各字段的一致即可。
load data local infile '/home/lvyang/Desktop/test.csv' into table Info fields terminated by ','
1
1
到此数据已经准备好了。下面就可以进行数据导出导入过程了。
由于需要使用MySql的数据读取接口,所以我们需要到官网下载相应的connector,并将其中包含的mysql-connector-java-版本号-bin.jar文件取出,添加到自己Project的依赖库中。如果对maven比较熟的,就可以忽视这些配置过程,直接配置pom.xml文件即可完成项目依赖设置,方便快捷。
JDBC数据读取:
public class JDBCUtils {
Connection conn=null
ResultSet rs=null
String databaseName=null
String userName=null
String password=null
String url=null
public JDBCUtils(String databaseName, String userName, String password, String url)
public void connect()
public ResultSet readData(String sql)
public void writeToConsole(ResultSet rs,String[] keys)
public boolean writeToLocal(ResultSet rs,String path)
public void close()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
1
2
3
4
5
6
7
8
9
10
11
12
13
14
上面是JDBC工具类定义,可以根据自己的需求,自行添加或者删除方法。部分方法的实现如下,仅做参考:
public void connect(){
try {
Class.forName("com.mysql.jdbc.Driver") //注册驱动
System.out.println("load mysql driver successfully!")
conn= (Connection) DriverManager.getConnection(url)//获得connection对象,完成数据库连接
} catch (ClassNotFoundException e) {
e.printStackTrace()
} catch (SQLException e) {
e.printStackTrace()
}
}
public ResultSet readData(String sql){
try {
Statement stmt= (Statement) conn.createStatement()//创建statement对象
rs=stmt.executeQuery(sql) //执行query命令,获取ResultSet
} catch (SQLException e) {
e.printStackTrace()
}
return rs
}
public void close(){
if(rs!=null){
try {
rs.close()
} catch (SQLException e) {
e.printStackTrace()
}
}
if(conn!=null){
try {
conn.close()
} catch (SQLException e) {
e.printStackTrace()
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
到此JDBC读取数据部分已经完成,下面需要实现HBase数据导入功能:
public class HBaseUtils {
private static final Log LOG= LogFactory.getLog(HBaseUtils.class)//LOG用于输出部分关键信息
//Here I choose construct func to init configuration instance
//and then use connectionFactory to create init conn instance
//at last,I use conn to get Hadmin instance
//next I will use Hadmin to operate hbase tables
private Configuration conf=null
private Admin Hadmin=null
private Connection conn=null
public HBaseUtils(Configuration conf)
public void connect()
public boolean isExist(String tableName)
public boolean createTable(String tableName,String columnFamily)
public boolean writeOne(HashMap<String,String>data,String[] keys, String tableName,String columnFamily)
public boolean writeMore(List<HashMap<String,String>>list,String[] keys,String tableName,String columnFamily)
public boolean deleteTable(String tableName)
public void close()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
部分方法的实现如下,仅做参考:
public void connect(){
try {
//create connection to hbase
conn= ConnectionFactory.createConnection(conf)
//get Hadmin which is the database manager
Hadmin=conn.getAdmin()
} catch (IOException e) {
e.printStackTrace()
if(conn!=null){
try {
conn.close()
} catch (IOException e1) {
e1.printStackTrace()
}
}
if(Hadmin!=null){
try {
Hadmin.close()
} catch (IOException e1) {
e1.printStackTrace()
}
}
}
}
public boolean createTable(String tableName,String columnFamily){
/**
* create table
* here I limit the number of column family to 1
* So here only can create one column family's table
* **/
TableName table_name= TableName.valueOf(tableName)
LOG.info("Create table:"+tableName+" now!")
HTableDescriptor tableDesc=new HTableDescriptor(table_name)
HColumnDescriptor columnDesc=new HColumnDescriptor(columnFamily)
tableDesc.addFamily(columnDesc)
try {
this.Hadmin.createTable(tableDesc)
} catch (IOException e) {
e.printStackTrace()
return false
}
return true
}
public boolean writeOne(HashMap<String,String>data,String[] keys, String tableName,String columnFamily){
/**
* write one data to table at one time
* here I choose the first key as the rowKey,because I think the first key usually is the primary key
* **/
boolean flag=false
TableName table_name=TableName.valueOf(tableName)
byte[] column_family=columnFamily.getBytes()
try {
Table table=conn.getTable(table_name)
byte[] rowKey=data.get(keys[0]).toString().getBytes()//construct HBase table's rowKey
Put put=new Put(rowKey)
for(int i=0i<keys.lengthi++){
byte[] key=keys[i].getBytes()
byte[] value=data.get(keys[i]).toString().getBytes()
put.addColumn(column_family,key,value)
}
table.put(put)
table.close()
flag=true
} catch (IOException e) {
e.printStackTrace()
}
return flag
}
public void close(){
/**
* close connection
* **/
LOG.info("Close connection to HBase!")
if(Hadmin!=null){
try {
Hadmin.close()
} catch (IOException e) {
e.printStackTrace()
}
}
if(conn!=null){
try {
conn.close()
} catch (IOException e) {
e.printStackTrace()
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
到此,工具类已经基本完成,下面需要写一个主类:
public class deMain {
public static void main(String[] args){
//JDBC Init
Connection conn=null
String sql="select * from Info"
String databaseName="test"
String userName="root"
String password="****"
String url="jdbc:mysql://localhost:3306/"+databaseName+"?user="+userName+"&password="
+password+"&useUnicode=true&characterEncoding=utf-8"
//HBase Client Init
String tableName="test"
String columnFamily="info"
Configuration conf= HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum","127.0.0.1")
conf.set("hbase.master","localhost:9000")
}
//JDBC connection and read data
JDBCUtils ju=new JDBCUtils(databaseName,userName,password,url)
ju.connect()
ResultSet rs=ju.readData(sql)
//HBase connect
HBaseUtils hbu=new HBaseUtils(conf)
hbu.connect()
//依次读取rs中每条记录,并将其写入HBase相应表中即可
.........
//close all connection
hbu.close()
ju.close()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
到此,大功告成!由于是通过IDE端运行hadoop程序,所以我们需要将需要用到的依赖库导入,而这个过程如果不借助maven的话,就会特别的痛苦。下图是我配置的项目依赖包,仅供参考。
这里写图片描述
注:hadoop2.7.2,hbase1.2.1,zookeeper3.4.6
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)