Hbase的Java API包含很多内容,已经比较完善。
1、HbaseConfiguration类
HbaseConfiguration类属于org.apache.hadoop.hbase包,功能是通过添加Hbase相关文件对Hbase进行配置文件对Hbase进行配置。常用的方法
1)
static org.apache.hadoop.conf.Configuration create()
通过读取默认位置(classpath)下的hbase-site.xml文件,得到各配置项的值,填充到产生Configuration的实例中。
2)
public static org.apache.hadoop.conf.Configuration create(org.apache.hadoop.conf.Configuration that)
读取指定的配置对象,并覆盖默认的Hbase配置。
获得org.apache.hadoop.conf.Configuration实例后,可以任意修改配置,例如:
Configuration conf = HbaseConfiguration.create(); conf.set("hbase.rootdir","hdfs://localhost:9000/hbase");
Connection接口
创建Connection实例
Configuration conf = HbaseConfiguration.create(); Connection connection= ConnectionFactory.createConnection(conf);
Connection提供的常用方法有以下几个。
1)
Admin getAdmin() throws IOException;
返回实施管理Hbase集群的一个管理员类Admin实例。返回的Admin不保证是线程安全的,应该为每个使用线程创建一个新实例;这是一个轻量级的 *** 作;不建议对返回的Admin进行池或缓存 *** 作。
2)
Table getTable(TableName var1) throws IOException;
返回一个Table实例,该实例不是线程安全的,应该为每个使用线程创建一个新实例。如果表不存在会抛出异常。使用实例后,调用者负责使用Table close()方法及时关闭。
3)
Configuration getConfiguration();
返回一个配置Configuration实例。
4)
boolean isClosed();
返回连接是否关闭
Put类
Put类位于org.apache.hadoop.hbase.client包,主要用来对单元格进行添加数据 *** 作。
1)Put类构造方法。Put类的常用构造方法有以下几个。
public Put(byte[] row)
指定行键row,行键类型为byte[]。
public Put(byte[] row, long ts)
指定行键row和时间戳ts,行键类型为byte[]。
public Put(ByteBuffer row)
指定行键row,行键类型为ByteBuffer。
public Put(ByteBuffer row, long ts)
指定行键row和时间戳ts,行键类型为ByteBuffer。
public Put(byte[] rowArray, int rowOffset, int rowLength)
从字符串rowArray中提取子串作为行键,参数rowOffset和rowLength分别为偏移量和截取长度。
public Put(byte[] rowArray, int rowOffset, int rowLength, long ts)
从字符串rowArray中提取子串作为行键,并指定时间戳ts
public Put(Put putToCopy)
复制构造方法
2)Put类常用方法
Get类
Get类位于org.apache.hadoop.hbase.client包,主要用来获取一行数据。
1)Get类构造方法。
public Get(byte[] row)
按指定行键row创建Get对象,行键类型为byte[]。
public Get(Get get)
复制构造方法。
2)Get类常用方法。
Delete类
Delete类位于org.apache.hadoop.hbase.client包,主要用来删除列族或列。
1)Delete类构造方法。
public Delete(byte[] row)
按指定行键row创建Delete对象。
public Delete(byte[] row, long timestamp)
按指定行键row和时间戳timestamp创建Delete对象。
public Delete(byte[] rowArray, int rowOffset, int rowLength)
按指定行键创建Delete对象。
public Delete(byte[] rowArray, int rowOffset, int rowLength, long ts)
按指定行键和时间戳ts创建Delete对象。
2)Delete类常用方法
Append类
Appand类位于org.apache.hadoop.hbase.client包,主要用在原有单元格值得基础上追加新值。
1)Append类构造方法
public Append(byte[] row)
按指定的行键row创建Append对象
public Append(byte[] rowArray, int rowOffset, int rowLength)
按指定的行键创建Append对象。
public Append(Append a)
复制构造方法。
2)Append类常用方法。
public Append add(byte[] family, byte[] qualifier, byte[] value)
在指定列族和限定符所确定的列追加值value。
public Append add(Cell cell)
在参数cell指定的位置追加值。
Scan类
Scan类属于org.apache.hadoop.hbase.client包,用来限定查询的数据,限定条件有版本号、起始行号、终止行号、列族等。
1)Scan类构造方法
public Scan(byte[] startRow)
创建Scan对象,指定开始行
public Scan(byte[] startRow, byte[] stopRow)
创建Scan对象,指定开始行和结束行
public Scan(byte[] startRow, Filter filter)
创建Scan对象,指定开始行和过滤条件
public Scan(Get get)
创建与get相同规格的Scan对象
2)Scan类常用方法。
Result类
Result类位于org.apache.hadoop.hbase.client包,主要用来存放Get和Scan *** 作后的结果,以键值对的形式存放在Map结构中。
ResultScanner接口
ResultScanner接口位于org.apache.hadoop.hbase.client包,ResultScanner的主要方法
Table接口
Table接口位于org.apache.hadoop.hbase.client包,主要用来和Hbase表进行交互,可用于从表中获取、插入、删除或扫描数据。
Table实例通过Connection实例的getTable(TableName tableName)方法获得。
HColumnDescriptor类
HColumnDescriptor属于org.apache.hadoop.hbase包,其含有列族的详细信息,例如,最大版本数、最小版本数、压缩算法、块大小、生存期、Bloom过滤器等信息。
1)HColumnDescriptor类构造方法。
public HColumnDescriptor(byte[] familyName)
使用familyName创建HColumnDescriptor对象,其他属性使用默认值
public HColumnDescriptor(String familyName)
使用familyName创建HColumnDescriptor对象,其他属性使用默认值
HColumnDescriptor(HColumnDescriptor desc)
复制构造方法
2)HColumnDescriptor类常用方法
HTableDescriptor类
HTableDescriptor类属于org.apache.hadoop.hbase包
构造方法
HTableDescriptor(TableName name)
其作用是根据给定的表名创建实例
Admin接口
Admin接口位于org.apache.hadoop.hbase.client包
下面以一个简单的程序为例,说明Hbase Java程序的开发过程
新建Java Project
添加依赖
4.0.0 com.example myHbase1.0-SNAPSHOT org.apache.hbase hbase-server1.2.6 org.apache.hbase hbase-client1.2.6
将hbase和hadoop配置文件放入资源目录
进入集群将配置文件夹中的hdfs-site.xml、core-site.xml、hbase-site.xml放入IDEA项目资源目录。
创建log4j.properties配置文件
# priority :debug> Method: %l ]%n%p:%m%n #debug log log4j.logger.debug=debug log4j.appender.debug=org.apache.log4j.DailyRollingFileAppender log4j.appender.debug.DatePattern='_'yyyy-MM-dd'.log' log4j.appender.debug.File=./src/com/hp/log/debug.log log4j.appender.debug.Append=true log4j.appender.debug.Threshold=DEBUG log4j.appender.debug.layout=org.apache.log4j.PatternLayout log4j.appender.debug.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss a} [Thread: %t][ Class:%c >> Method: %l ]%n%p:%m%n #warn log log4j.logger.warn=warn log4j.appender.warn=org.apache.log4j.DailyRollingFileAppender log4j.appender.warn.DatePattern='_'yyyy-MM-dd'.log' log4j.appender.warn.File=./src/com/hp/log/warn.log log4j.appender.warn.Append=true log4j.appender.warn.Threshold=WARN log4j.appender.warn.layout=org.apache.log4j.PatternLayout log4j.appender.warn.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss a} [Thread: %t][ Class:%c >> Method: %l ]%n%p:%m%n #error log4j.logger.error=error log4j.appender.error = org.apache.log4j.DailyRollingFileAppender log4j.appender.error.DatePattern='_'yyyy-MM-dd'.log' log4j.appender.error.File = ./src/com/hp/log/error.log log4j.appender.error.Append = true log4j.appender.error.Threshold = ERROR log4j.appender.error.layout = org.apache.log4j.PatternLayout log4j.appender.error.layout.ConversionPattern = %d{yyyy-MM-dd HH:mm:ss a} [Thread: %t][ Class:%c >> Method: %l ]%n%p:%m%n
新建class类
在src/main/java下创建com.example包,并在包下创建一个HbaseTest类。
package com.example; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HbaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import java.io.IOException; public class HbaseTest { Connection connection=null; Admin admin=null; public Boolean init(){ //初始化代码,读取配置文件,完成连接,获取Admin对象 Configuration conf= HbaseConfiguration.create(); // conf.set("hbase.rootdir","hdfs://localhost:9000/hbase"); try{ this.connection= ConnectionFactory.createConnection(conf); } catch (IOException e) { e.printStackTrace(); return false; } try { this.admin=this.connection.getAdmin(); } catch (IOException e) { e.printStackTrace(); try { this.connection.close(); } catch (IOException e1) { e1.printStackTrace(); return false; } } return true; } public void close(){ if(admin!=null){ try{ admin.close(); } catch (IOException e) { e.printStackTrace(); } } if(connection!=null){ try{ connection.close(); } catch (IOException e) { e.printStackTrace(); } } } public static void main(String[] args){ HbaseTest test=new HbaseTest(); if(! test.init())return; try{ System.out.println(test.admin.tableExists(TableName.valueOf("student"))); } catch (IOException e) { e.printStackTrace(); }finally { test.close(); } } }
运行HbaseTest
运行前需要启动集群,并保证hadoop和hbase运行正常。
显示表及其列族
showAllTable.java
package com.example; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HbaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import java.io.IOException; public class ShowAllTable extends HbaseTest{ public void showAllTable() throws IOException { HTableDescriptor []htables=admin.listTables(); for(HTableDescriptor hdes:htables){ HColumnDescriptor []cdeses=hdes.getColumnFamilies(); System.out.println(" ==================== "); System.out.println("table:"+hdes.getNameAsString()+"tColumn Families"); System.out.println(" ==================== "); for(HColumnDescriptor cdes:cdeses){ System.out.println(cdes.getNameAsString()); } } } public static void main(String []args){ ShowAllTable showAllTable=new ShowAllTable(); if(!showAllTable.init())return; try{ showAllTable.showAllTable(); } catch (IOException e) { e.printStackTrace(); }finally { showAllTable.close(); } } }
表的创建和删除
package com.example; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import java.io.IOException; public class OperationTable extends HbaseTest { private void createTable(String myTableName, String[] colFamily) throws IOException { TableName tableName=TableName.valueOf(myTableName); if(admin.tableExists(tableName)){ System.out.println("table is exists"); }else { HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName); for (String str : colFamily) { HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(str); hColumnDescriptor.setMaxVersions(5); hTableDescriptor.addFamily(hColumnDescriptor); } admin.createTable(hTableDescriptor); System.out.println("create table success!"); } } public static void main(String []args){ OperationTable operationTable=new OperationTable(); String[] colFamily={"name","id","test"}; if(!operationTable.init())return; try{ operationTable.createTable("test:HbaseTest",colFamily); } catch (IOException e) { e.printStackTrace(); } } }
删除表
private void deleteTable(String tableName) throws IOException { TableName tabName=TableName.valueOf(tableName); if(admin.tableExists(tabName)){ admin.disableTable(tabName); admin.deleteTable(tabName); } }
增加数据
1)编写增加数据insertCell方法
参数依次为表名、行键、列族、列限定符和值。
public void insertCell(String tableName, String rowKey, String colFamily, String col, String val) throws IOException { Table table=connection.getTable(TableName.valueOf(tableName)); Put put=new Put(rowKey.getBytes()); put.addColumn(colFamily.getBytes(),col.getBytes(),val.getBytes()); table.put(put); table.close(); }
重载insertCell
参数依次为表名、行键、列族、列限定符、时间戳和值。
public void insertCell(String tableName, String rowKey, String colFamily, String col,long ts, String val) throws IOException { Table table=connection.getTable(TableName.valueOf(tableName)); Put put=new Put(rowKey.getBytes()); put.addColumn(colFamily.getBytes(),col.getBytes(),ts,val.getBytes()); table.put(put); table.close(); }
编写批量增加insertList方法
public void insertList(String tableName,String[] rowKey,String colFamily,String col,String[] val) throws IOException { Table table=connection.getTable(TableName.valueOf(tableName)); ArrayListlist=new ArrayList (); for(int i=0;i
追加数据public void appendCell(String tableName, String rowKey, String colFamily, String col, String val) throws IOException { Table table=connection.getTable(TableName.valueOf(tableName)); Append append=new Append(rowKey.getBytes()); append.add(colFamily.getBytes(),col.getBytes(),val.getBytes()); table.append(append); table.close(); }
查看数据
1)显示一行的内容public void showCell(Result result,boolean flag){ Listcells=result.listCells(); if(flag) System.out.println("Row Family;QualifiertTimetamptvalue"); for(Cell cell:cells){ System.out.println("-----------------------------------"); System.out.printf("%-20s",new String(cell.getRowArray(),cell.getRowOffset(),cell.getRowLength())); System.out.printf("%s:%st",new String(cell.getQualifierArray(),cell.getQualifierOffset(),cell.getQualifierLength())); System.out.printf("%15st",cell.getTimestamp()); System.out.printf("%-20sn",new String(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength())); } } public void getData(String tableName,String rowKey,String colFamily,String col,int maxVersions) throws IOException { Table table=connection.getTable(TableName.valueOf(tableName)); Get get=new Get(rowKey.getBytes()); if(col.equals("")) get.addFamily(colFamily.getBytes()); else get.addColumn(colFamily.getBytes(),col.getBytes()); if(maxVersions>1){ get.setMaxVersions(maxVersions); } Result result=table.get(get); showCell(result,true); table.close(); } |
2)扫描表中的内容public void scanData(String tableName,String colFamily,String col,int maxVersion) throws IOException { Table table=connection.getTable(TableName.valueOf(tableName)); Scan scan=new Scan(); if(col.equals("")) scan.addFamily(colFamily.getBytes()); else scan.addColumn(colFamily.getBytes(),col.getBytes()); if(maxVersion!=0){ scan.setMaxVersions(maxVersion); } ResultScanner rs=table.getScanner(scan); boolean flag=true; for(Result result:rs){ showCell(result,flag); flag=false; } rs.close(); table.close(); }
删除数据
1)删除列族public void deleteFamily(String tableName,String rowKey,String colFamily,int flag,long timeStamp) throws IOException { Table table=connection.getTable(TableName.valueOf(tableName)); Delete delete=new Delete(rowKey.getBytes()); switch (flag){ case 0://删除指定列族的所有列和版本 delete.addFamily(colFamily.getBytes()); break; case 1://删除指定时间戳的版本 delete.addFamilyVersion(colFamily.getBytes(),timeStamp); break; case 2://删除小于等于时间戳的版本 delete.addFamily(colFamily.getBytes(),timeStamp); } table.delete(delete); table.close(); }2)删除列
public void deleteColumn(String tableName, String rowKey, String colFamily, String col, int flag, long timeStamp) throws IOException { Table table = connection.getTable(TableName.valueOf(tableName)); Delete delete = new Delete(rowKey.getBytes()); switch (flag){ case 0://删除列的所有版本 delete.addColumns(colFamily.getBytes(),col.getBytes()); break; case 1://删除指定时间戳的版本 delete.addColumn(colFamily.getBytes(),col.getBytes(),timeStamp); break; case 2://删除小于等于时间戳的版本 delete.addColumns(colFamily.getBytes(),col.getBytes(),timeStamp); break; case 3://删除列的最新版本 delete.addColumn(colFamily.getBytes(),col.getBytes()); } table.delete(delete); table.close(); }欢迎分享,转载请注明来源:内存溢出
评论列表(0条)