用MapReduce分析Hbase将结果插入mysql中

用MapReduce分析Hbase将结果插入mysql中,第1张

从HBASE读取清洗过的数据,写入到mysql的表中

NewInstallUserRunner.java

计算新增用户入口类

main方法

只有一个方法ToolRunner.run

入口类implements Tool接口

Tool定义run方法

Tool 继承Configurable

Configurable定义两个方法

所以入口类需要实现3个方法

setConf方法实现

定义output-collector的类,反射用的

输出到mysql的时候,组成insert语句的

输出到mysql的链接信息

getConf方法实现

run方法的实现

processArgs方法解读

run方法第一条this.processArgs(conf, args)执行结束,

返回结果默认是昨天,或者运行时加入-d yyyy-DD-mm 格式输入的日期。

参数中用到的枚举方法

addDependencyJars 如果在服务器运行,需要设置为true 如果在本地运行 设置成false

http://www.jianshu.com/p/fd25d036d4dc

http://www.jianshu.com/p/1493de43dbf7

if (job.waitForCompletion(true)) {

// 执行成功, 需要计算总用户

this.calculateTotalUsers(conf)

return 0

} else {

return -1

}

方法1:最基本的数据导入方法。首先通过JDBC将原本关系型数据库中的数据读出到内存中,然后在使用HBase自带的客户端API将数据put到相应的表中。这种方法通用性强,只要写好接口就可以用,但是效率并不高。

方法2:使用这种方法之前其实是需要先将数据导出到本地,以文本的形式保存,然后使用TableReudcer类编写MapReduce job。这种方法需要频繁的I/O *** 作,所以效率不高,容易导致HBase节点的不稳定。

方法3:importtsv是HBase内置的数据导入工具,目的是将tsv格式的文件加载到HBase中,本质上它是通过调用MapReudce Job实现数据导入的。注意:使用该方法,需要提前将数据导出到本地,以tsv格式存储。unbulk load模式的importtsv效果一般,适用于小型的数据。

方法4:bulk load是一个快速大量数据高效导入工具,相比于importtsv效率更高。

方法5:Sqoop是apache软件基金会的一个项目,可以用来实现关系型数据库和hdfs,hbase,hive之间的数据高效传输。只需要做一些简单的配置,通过Sqoop命令行指令就可以方便的实现数据导入和导出。

下面具体介绍每种方法的做法:

JDBC &HBase Client API

此处以MySql为例。首先在MySql数据库中创建database ‘test’,然后创建一张表’Info’,这里可以使用可视化软件(例如workbench),也可以直接在命令行输入相应指令:

:~$mysql -u root -p #root用户登录mysql

#创建Info表

CREATE TABLE Info (`ID` INT NOT NULL,

`Name` VARCHAR(45) NOT NULL,

`Number` INT NOT NULL,

`Time` VARCHAR(45) NOT NULL,

PRIMARY KEY (`ID`))

1

2

3

4

5

6

7

1

2

3

4

5

6

7

然后使用load指令将准备好的数据导入到Info中。数据格式与Info各字段的一致即可。

load data local infile '/home/lvyang/Desktop/test.csv' into table Info fields terminated by ','

1

1

到此数据已经准备好了。下面就可以进行数据导出导入过程了。

由于需要使用MySql的数据读取接口,所以我们需要到官网下载相应的connector,并将其中包含的mysql-connector-java-版本号-bin.jar文件取出,添加到自己Project的依赖库中。如果对maven比较熟的,就可以忽视这些配置过程,直接配置pom.xml文件即可完成项目依赖设置,方便快捷。

JDBC数据读取:

public class JDBCUtils {

Connection conn=null

ResultSet rs=null

String databaseName=null

String userName=null

String password=null

String url=null

public JDBCUtils(String databaseName, String userName, String password, String url)

public void connect()

public ResultSet readData(String sql)

public void writeToConsole(ResultSet rs,String[] keys)

public boolean writeToLocal(ResultSet rs,String path)

public void close()

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

1

2

3

4

5

6

7

8

9

10

11

12

13

14

上面是JDBC工具类定义,可以根据自己的需求,自行添加或者删除方法。部分方法的实现如下,仅做参考:

public void connect(){

try {

Class.forName("com.mysql.jdbc.Driver") //注册驱动

System.out.println("load mysql driver successfully!")

conn= (Connection) DriverManager.getConnection(url)//获得connection对象,完成数据库连接

} catch (ClassNotFoundException e) {

e.printStackTrace()

} catch (SQLException e) {

e.printStackTrace()

}

}

public ResultSet readData(String sql){

try {

Statement stmt= (Statement) conn.createStatement()//创建statement对象

rs=stmt.executeQuery(sql) //执行query命令,获取ResultSet

} catch (SQLException e) {

e.printStackTrace()

}

return rs

}

public void close(){

if(rs!=null){

try {

rs.close()

} catch (SQLException e) {

e.printStackTrace()

}

}

if(conn!=null){

try {

conn.close()

} catch (SQLException e) {

e.printStackTrace()

}

}

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

到此JDBC读取数据部分已经完成,下面需要实现HBase数据导入功能:

public class HBaseUtils {

private static final Log LOG= LogFactory.getLog(HBaseUtils.class)//LOG用于输出部分关键信息

//Here I choose construct func to init configuration instance

//and then use connectionFactory to create init conn instance

//at last,I use conn to get Hadmin instance

//next I will use Hadmin to operate hbase tables

private Configuration conf=null

private Admin Hadmin=null

private Connection conn=null

public HBaseUtils(Configuration conf)

public void connect()

public boolean isExist(String tableName)

public boolean createTable(String tableName,String columnFamily)

public boolean writeOne(HashMap<String,String>data,String[] keys, String tableName,String columnFamily)

public boolean writeMore(List<HashMap<String,String>>list,String[] keys,String tableName,String columnFamily)

public boolean deleteTable(String tableName)

public void close()

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

部分方法的实现如下,仅做参考:

public void connect(){

try {

//create connection to hbase

conn= ConnectionFactory.createConnection(conf)

//get Hadmin which is the database manager

Hadmin=conn.getAdmin()

} catch (IOException e) {

e.printStackTrace()

if(conn!=null){

try {

conn.close()

} catch (IOException e1) {

e1.printStackTrace()

}

}

if(Hadmin!=null){

try {

Hadmin.close()

} catch (IOException e1) {

e1.printStackTrace()

}

}

}

}

public boolean createTable(String tableName,String columnFamily){

/**

* create table

* here I limit the number of column family to 1

* So here only can create one column family's table

* **/

TableName table_name= TableName.valueOf(tableName)

LOG.info("Create table:"+tableName+" now!")

HTableDescriptor tableDesc=new HTableDescriptor(table_name)

HColumnDescriptor columnDesc=new HColumnDescriptor(columnFamily)

tableDesc.addFamily(columnDesc)

try {

this.Hadmin.createTable(tableDesc)

} catch (IOException e) {

e.printStackTrace()

return false

}

return true

}

public boolean writeOne(HashMap<String,String>data,String[] keys, String tableName,String columnFamily){

/**

* write one data to table at one time

* here I choose the first key as the rowKey,because I think the first key usually is the primary key

* **/

boolean flag=false

TableName table_name=TableName.valueOf(tableName)

byte[] column_family=columnFamily.getBytes()

try {

Table table=conn.getTable(table_name)

byte[] rowKey=data.get(keys[0]).toString().getBytes()//construct HBase table's rowKey

Put put=new Put(rowKey)

for(int i=0i<keys.lengthi++){

byte[] key=keys[i].getBytes()

byte[] value=data.get(keys[i]).toString().getBytes()

put.addColumn(column_family,key,value)

}

table.put(put)

table.close()

flag=true

} catch (IOException e) {

e.printStackTrace()

}

return flag

}

public void close(){

/**

* close connection

* **/

LOG.info("Close connection to HBase!")

if(Hadmin!=null){

try {

Hadmin.close()

} catch (IOException e) {

e.printStackTrace()

}

}

if(conn!=null){

try {

conn.close()

} catch (IOException e) {

e.printStackTrace()

}

}

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

到此,工具类已经基本完成,下面需要写一个主类:

public class deMain {

public static void main(String[] args){

//JDBC Init

Connection conn=null

String sql="select * from Info"

String databaseName="test"

String userName="root"

String password="****"

String url="jdbc:mysql://localhost:3306/"+databaseName+"?user="+userName+"&password="

+password+"&useUnicode=true&characterEncoding=utf-8"

//HBase Client Init

String tableName="test"

String columnFamily="info"

Configuration conf= HBaseConfiguration.create()

conf.set("hbase.zookeeper.quorum","127.0.0.1")

conf.set("hbase.master","localhost:9000")

}

//JDBC connection and read data

JDBCUtils ju=new JDBCUtils(databaseName,userName,password,url)

ju.connect()

ResultSet rs=ju.readData(sql)

//HBase connect

HBaseUtils hbu=new HBaseUtils(conf)

hbu.connect()

//依次读取rs中每条记录,并将其写入HBase相应表中即可

.........

//close all connection

hbu.close()

ju.close()

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

到此,大功告成!由于是通过IDE端运行hadoop程序,所以我们需要将需要用到的依赖库导入,而这个过程如果不借助maven的话,就会特别的痛苦。下图是我配置的项目依赖包,仅供参考。

这里写图片描述

注:hadoop2.7.2,hbase1.2.1,zookeeper3.4.6


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/6100673.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-03-14
下一篇 2023-03-14

发表评论

登录后才能评论

评论列表(0条)

保存