如何执行hbase 的mapreduce job

如何执行hbase 的mapreduce job,第1张

package test

import java.io.IOException

import java.util.Map

import org.apache.hadoop.conf.Configuration

import org.apache.hadoop.hbase.HBaseConfiguration

import org.apache.hadoop.hbase.HColumnDescriptor

import org.apache.hadoop.hbase.HTableDescriptor

import org.apache.hadoop.hbase.client.HBaseAdmin

import org.apache.hadoop.hbase.client.HTable

import org.apache.hadoop.hbase.client.Put

import org.apache.hadoop.hbase.client.Result

public class Htable {

/**

* @param args

*/

public static void main(String[] args) throws IOException {

// TODO Auto-generated method stub

Configuration hbaseConf = HBaseConfiguration.create()

HBaseAdmin admin = new HBaseAdmin(hbaseConf)

HTableDescriptor htableDescriptor = new HTableDescriptor("table"

.getBytes()) //set the name of table

htableDescriptor.addFamily(new HColumnDescriptor("fam1"))//set the name of column clusters

admin.createTable(htableDescriptor)//create a table

HTable table = new HTable(hbaseConf, "table")//get instance of table.

for (int i = 0i <3i++) { //for is number of rows

Put putRow = new Put(("row" + i).getBytes())//the ith row

putRow.add("fam1".getBytes(), "col1".getBytes(), "vaule1"

.getBytes()) //set the name of column and value.

putRow.add("fam1".getBytes(), "col2".getBytes(), "vaule2"

.getBytes())

putRow.add("fam1".getBytes(), "col3".getBytes(), "vaule3"

.getBytes())

table.put(putRow)

}

for(Result result: table.getScanner("fam1".getBytes())){//get data of column clusters

for(Map.Entry<byte[], byte[]>entry : result.getFamilyMap("fam1".getBytes()).entrySet()){//get collection of result

String column = new String(entry.getKey())

String value = new String(entry.getValue())

System.out.println(column+","+value)

}

}

admin.disableTable("table".getBytes())//disable the table

admin.deleteTable("table".getBytes()) //drop the tbale

}

}

第一种情况:

1.测试hbase:

a)cd hbase-0.90.4

b)bin/start-hbase.sh

c)bin/hbase shell

d)create ‘database’,’cf’

e)list

f) 如果成功则可以看到有下面的结果:

hbase(main):001:0>list TABLE database1 row(s)in 0.5910 seconds

2. 创建Java project, 将hbase-0.90.4下面的lib目录拷贝到工程,将其中的jar包加入classpath, 还有hbase-0.90.5.jar 和 test.jar

3. 创建类

public class HelloHBase {

public static void main(String[] args) throws IOException {

Configuration conf = HBaseConfiguration.create()

conf.set("hbase.zookeeper.quorum", "192.168.128.128")

HBaseAdmin admin = new HBaseAdmin(conf)

HTableDescriptor tableDescriptor = admin.getTableDescriptor(Bytes.toBytes("database"))

byte[] name = tableDescriptor.getName()

System.out.println(new String(name))

HColumnDescriptor[] columnFamilies = tableDescriptor.getColumnFamilies()

for (HColumnDescriptor d : columnFamilies) {

System.out.println(d.getNameAsString())

}

}

运行,此时应该打印出下面两行:

database cf

若没有,说明配置失败,请检查其他设置。

==============================================

问题1:

java.net.ConnectException: Connection refused: no further information

a. zookeeper.ClientCnxn: Session 0x0 for server null,

解决: zppkeeper未启动,或无法连接,从查看各节点zookeeper启动状态、端口占用、防火墙等方面查看原因

b. getMaster attempt 4 of 10 failedretrying after sleep of 2000

解决:查看 master log , 如果有信息org.apache.hadoop.hbase.regionserver.HRegionServer: Serving as BRDVM0240,43992,1373943529301, RPC listening on /127.0.0.1:43992, sessionid=0x13fe56a7d4b0001

则说明, HRegionServer

监听的端口是localhost 127.0.0.1, 需要修改 server端 /etc/hosts 文件, 127.0.0.1

servername localhost.localdomain localhost

去掉 servername, 然后重启hbase

第二种情况:

java.net.ConnectException: Connection refused: no further information

at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)

at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:567)

at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1119)

12/09/03 15:37:15 INFO zookeeper.ClientCnxn: Opening socket connection to server /192.168.0.118:2181

12/09/03 15:37:16 INFO zookeeper.ClientCnxn: EventThread shut down

12/09/03 15:37:16 INFO zookeeper.ZooKeeper: Session: 0x0 closed

Exception in thread "main" org.apache.hadoop.hbase.ZooKeeperConnectionException: HBase is able to connect to ZooKeeper but the connection closes immediately. This could be a sign that the server has too many connections (30 is the default). Consider inspecting your ZK server logs for that error and then make sure you are reusing HBaseConfiguration as often as you can. See HTable's javadoc for more information.

at org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher.<init>(ZooKeeperWatcher.java:156)

at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.getZooKeeperWatcher(HConnectionManager.java:1209)

at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.setupZookeeperTrackers(HConnectionManager.java:511)

at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.<init>(HConnectionManager.java:502)

at org.apache.hadoop.hbase.client.HConnectionManager.getConnection(HConnectionManager.java:172)

at org.apache.hadoop.hbase.client.HBaseAdmin.<init>(HBaseAdmin.java:92)

at com.biencloud.test.first_hbase.main(first_hbase.java:22)

Caused by: org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /hbase

at org.apache.zookeeper.KeeperException.create(KeeperException.java:90)

at org.apache.zookeeper.KeeperException.create(KeeperException.java:42)

at org.apache.zookeeper.ZooKeeper.exists(ZooKeeper.java:809)

at org.apache.zookeeper.ZooKeeper.exists(ZooKeeper.java:837)

at org.apache.hadoop.hbase.zookeeper.ZKUtil.createAndFailSilent(ZKUtil.java:931)

at org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher.<init>(ZooKeeperWatcher.java:134)

... 6 more

这个错误说明eclipse没有连接到zookeeper,在程序中添加zookeeper配置信息即可,具体如下:

Configuration conf=HBaseConfiguration.create()

conf.set("hbase.zookeeper.quorum","192.168.0.118, 192.168.0.186, 192.168.0.182")

conf.set("hbase.zookeeper.property.clientPort","2222")

附上出处链接:http://www.aboutyun.com/thread-5866-1-1.html

方法1:最基本的数据导入方法。首先通过JDBC将原本关系型数据库中的数据读出到内存中,然后在使用HBase自带的客户端API将数据put到相应的表中。这种方法通用性强,只要写好接口就可以用,但是效率并不高。

方法2:使用这种方法之前其实是需要先将数据导出到本地,以文本的形式保存,然后使用TableReudcer类编写MapReduce job。这种方法需要频繁的I/O *** 作,所以效率不高,容易导致HBase节点的不稳定。

方法3:importtsv是HBase内置的数据导入工具,目的是将tsv格式的文件加载到HBase中,本质上它是通过调用MapReudce Job实现数据导入的。注意:使用该方法,需要提前将数据导出到本地,以tsv格式存储。unbulk load模式的importtsv效果一般,适用于小型的数据。

方法4:bulk load是一个快速大量数据高效导入工具,相比于importtsv效率更高。

方法5:Sqoop是apache软件基金会的一个项目,可以用来实现关系型数据库和hdfs,hbase,hive之间的数据高效传输。只需要做一些简单的配置,通过Sqoop命令行指令就可以方便的实现数据导入和导出。

下面具体介绍每种方法的做法:

JDBC &HBase Client API

此处以MySql为例。首先在MySql数据库中创建database ‘test’,然后创建一张表’Info’,这里可以使用可视化软件(例如workbench),也可以直接在命令行输入相应指令:

:~$mysql -u root -p #root用户登录mysql

#创建Info表

CREATE TABLE Info (`ID` INT NOT NULL,

`Name` VARCHAR(45) NOT NULL,

`Number` INT NOT NULL,

`Time` VARCHAR(45) NOT NULL,

PRIMARY KEY (`ID`))

1

2

3

4

5

6

7

1

2

3

4

5

6

7

然后使用load指令将准备好的数据导入到Info中。数据格式与Info各字段的一致即可。

load data local infile '/home/lvyang/Desktop/test.csv' into table Info fields terminated by ','

1

1

到此数据已经准备好了。下面就可以进行数据导出导入过程了。

由于需要使用MySql的数据读取接口,所以我们需要到官网下载相应的connector,并将其中包含的mysql-connector-java-版本号-bin.jar文件取出,添加到自己Project的依赖库中。如果对maven比较熟的,就可以忽视这些配置过程,直接配置pom.xml文件即可完成项目依赖设置,方便快捷。

JDBC数据读取:

public class JDBCUtils {

Connection conn=null

ResultSet rs=null

String databaseName=null

String userName=null

String password=null

String url=null

public JDBCUtils(String databaseName, String userName, String password, String url)

public void connect()

public ResultSet readData(String sql)

public void writeToConsole(ResultSet rs,String[] keys)

public boolean writeToLocal(ResultSet rs,String path)

public void close()

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

1

2

3

4

5

6

7

8

9

10

11

12

13

14

上面是JDBC工具类定义,可以根据自己的需求,自行添加或者删除方法。部分方法的实现如下,仅做参考:

public void connect(){

try {

Class.forName("com.mysql.jdbc.Driver") //注册驱动

System.out.println("load mysql driver successfully!")

conn= (Connection) DriverManager.getConnection(url)//获得connection对象,完成数据库连接

} catch (ClassNotFoundException e) {

e.printStackTrace()

} catch (SQLException e) {

e.printStackTrace()

}

}

public ResultSet readData(String sql){

try {

Statement stmt= (Statement) conn.createStatement()//创建statement对象

rs=stmt.executeQuery(sql) //执行query命令,获取ResultSet

} catch (SQLException e) {

e.printStackTrace()

}

return rs

}

public void close(){

if(rs!=null){

try {

rs.close()

} catch (SQLException e) {

e.printStackTrace()

}

}

if(conn!=null){

try {

conn.close()

} catch (SQLException e) {

e.printStackTrace()

}

}

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

到此JDBC读取数据部分已经完成,下面需要实现HBase数据导入功能:

public class HBaseUtils {

private static final Log LOG= LogFactory.getLog(HBaseUtils.class)//LOG用于输出部分关键信息

//Here I choose construct func to init configuration instance

//and then use connectionFactory to create init conn instance

//at last,I use conn to get Hadmin instance

//next I will use Hadmin to operate hbase tables

private Configuration conf=null

private Admin Hadmin=null

private Connection conn=null

public HBaseUtils(Configuration conf)

public void connect()

public boolean isExist(String tableName)

public boolean createTable(String tableName,String columnFamily)

public boolean writeOne(HashMap<String,String>data,String[] keys, String tableName,String columnFamily)

public boolean writeMore(List<HashMap<String,String>>list,String[] keys,String tableName,String columnFamily)

public boolean deleteTable(String tableName)

public void close()

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

部分方法的实现如下,仅做参考:

public void connect(){

try {

//create connection to hbase

conn= ConnectionFactory.createConnection(conf)

//get Hadmin which is the database manager

Hadmin=conn.getAdmin()

} catch (IOException e) {

e.printStackTrace()

if(conn!=null){

try {

conn.close()

} catch (IOException e1) {

e1.printStackTrace()

}

}

if(Hadmin!=null){

try {

Hadmin.close()

} catch (IOException e1) {

e1.printStackTrace()

}

}

}

}

public boolean createTable(String tableName,String columnFamily){

/**

* create table

* here I limit the number of column family to 1

* So here only can create one column family's table

* **/

TableName table_name= TableName.valueOf(tableName)

LOG.info("Create table:"+tableName+" now!")

HTableDescriptor tableDesc=new HTableDescriptor(table_name)

HColumnDescriptor columnDesc=new HColumnDescriptor(columnFamily)

tableDesc.addFamily(columnDesc)

try {

this.Hadmin.createTable(tableDesc)

} catch (IOException e) {

e.printStackTrace()

return false

}

return true

}

public boolean writeOne(HashMap<String,String>data,String[] keys, String tableName,String columnFamily){

/**

* write one data to table at one time

* here I choose the first key as the rowKey,because I think the first key usually is the primary key

* **/

boolean flag=false

TableName table_name=TableName.valueOf(tableName)

byte[] column_family=columnFamily.getBytes()

try {

Table table=conn.getTable(table_name)

byte[] rowKey=data.get(keys[0]).toString().getBytes()//construct HBase table's rowKey

Put put=new Put(rowKey)

for(int i=0i<keys.lengthi++){

byte[] key=keys[i].getBytes()

byte[] value=data.get(keys[i]).toString().getBytes()

put.addColumn(column_family,key,value)

}

table.put(put)

table.close()

flag=true

} catch (IOException e) {

e.printStackTrace()

}

return flag

}

public void close(){

/**

* close connection

* **/

LOG.info("Close connection to HBase!")

if(Hadmin!=null){

try {

Hadmin.close()

} catch (IOException e) {

e.printStackTrace()

}

}

if(conn!=null){

try {

conn.close()

} catch (IOException e) {

e.printStackTrace()

}

}

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

到此,工具类已经基本完成,下面需要写一个主类:

public class deMain {

public static void main(String[] args){

//JDBC Init

Connection conn=null

String sql="select * from Info"

String databaseName="test"

String userName="root"

String password="****"

String url="jdbc:mysql://localhost:3306/"+databaseName+"?user="+userName+"&password="

+password+"&useUnicode=true&characterEncoding=utf-8"

//HBase Client Init

String tableName="test"

String columnFamily="info"

Configuration conf= HBaseConfiguration.create()

conf.set("hbase.zookeeper.quorum","127.0.0.1")

conf.set("hbase.master","localhost:9000")

}

//JDBC connection and read data

JDBCUtils ju=new JDBCUtils(databaseName,userName,password,url)

ju.connect()

ResultSet rs=ju.readData(sql)

//HBase connect

HBaseUtils hbu=new HBaseUtils(conf)

hbu.connect()

//依次读取rs中每条记录,并将其写入HBase相应表中即可

.........

//close all connection

hbu.close()

ju.close()

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

到此,大功告成!由于是通过IDE端运行hadoop程序,所以我们需要将需要用到的依赖库导入,而这个过程如果不借助maven的话,就会特别的痛苦。下图是我配置的项目依赖包,仅供参考。

这里写图片描述

注:hadoop2.7.2,hbase1.2.1,zookeeper3.4.6


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/tougao/11269965.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-14
下一篇 2023-05-14

发表评论

登录后才能评论

评论列表(0条)

保存