HBase开发

HBase开发,第1张

HBase开发 Hbase开发 Java API简介

Hbase的Java API包含很多内容,已经比较完善。
1、HbaseConfiguration类
HbaseConfiguration类属于org.apache.hadoop.hbase包,功能是通过添加Hbase相关文件对Hbase进行配置文件对Hbase进行配置。常用的方法
1)

static org.apache.hadoop.conf.Configuration create()

通过读取默认位置(classpath)下的hbase-site.xml文件,得到各配置项的值,填充到产生Configuration的实例中。
2)

public static org.apache.hadoop.conf.Configuration create(org.apache.hadoop.conf.Configuration that)

读取指定的配置对象,并覆盖默认的Hbase配置。
获得org.apache.hadoop.conf.Configuration实例后,可以任意修改配置,例如:

		Configuration conf = HbaseConfiguration.create();
        conf.set("hbase.rootdir","hdfs://localhost:9000/hbase");

Connection接口
创建Connection实例

		Configuration conf = HbaseConfiguration.create();
        Connection connection= ConnectionFactory.createConnection(conf);

Connection提供的常用方法有以下几个。
1)

Admin getAdmin() throws IOException;

返回实施管理Hbase集群的一个管理员类Admin实例。返回的Admin不保证是线程安全的,应该为每个使用线程创建一个新实例;这是一个轻量级的 *** 作;不建议对返回的Admin进行池或缓存 *** 作。
2)

Table getTable(TableName var1) throws IOException;

返回一个Table实例,该实例不是线程安全的,应该为每个使用线程创建一个新实例。如果表不存在会抛出异常。使用实例后,调用者负责使用Table close()方法及时关闭。
3)

Configuration getConfiguration();

返回一个配置Configuration实例。
4)

boolean isClosed();

返回连接是否关闭
Put类
Put类位于org.apache.hadoop.hbase.client包,主要用来对单元格进行添加数据 *** 作。
1)Put类构造方法。Put类的常用构造方法有以下几个。

public Put(byte[] row)

指定行键row,行键类型为byte[]。

public Put(byte[] row, long ts)

指定行键row和时间戳ts,行键类型为byte[]。

public Put(ByteBuffer row)

指定行键row,行键类型为ByteBuffer。

public Put(ByteBuffer row, long ts)

指定行键row和时间戳ts,行键类型为ByteBuffer。

public Put(byte[] rowArray, int rowOffset, int rowLength)

从字符串rowArray中提取子串作为行键,参数rowOffset和rowLength分别为偏移量和截取长度。

public Put(byte[] rowArray, int rowOffset, int rowLength, long ts)

从字符串rowArray中提取子串作为行键,并指定时间戳ts

public Put(Put putToCopy)

复制构造方法
2)Put类常用方法

方法名方法说明Put add(Cell kv)添加KeyValue对象public Put addColumn(byte[] family, byte[] qualifier, byte[] value)把指定的列族、限定符和值添加到Put中public List get(byte[] family, byte[] qualifier)查询指定的列族和限定符所匹配的值,组织成KeyValue类型的List列表返回public boolean has(byte[] family, byte[] qualifier)是否有指定的列族和限定符public boolean has(byte[] family, byte[] qualifier, byte[] value)是否含有指定的列族、限定符和值public boolean has(byte[] family, byte[] qualifier, long ts)是否含有指定的列族、限定符和时间戳public boolean has(byte[] family, byte[] qualifier, long ts, byte[] value)是否含有指定的列族、限定符、时间戳和值

Get类
Get类位于org.apache.hadoop.hbase.client包,主要用来获取一行数据。
1)Get类构造方法。

public Get(byte[] row)

按指定行键row创建Get对象,行键类型为byte[]。

public Get(Get get)

复制构造方法。
2)Get类常用方法。

方法名方法说明public Get addFamily(byte[] family)获取指定的列族的所有列public Get addColumn(byte[] family, byte[] qualifier)获取指定的列族和列限定符所确定的列public Get setTimeRange(long minStamp, long maxStamp) throws IOException获取指定的时间戳范围的值public Get setTimeStamp(long timestamp) throws IOException获取指定的时间戳的值public Get setMaxVersions()把要取出的最大版本数设为用户在列族描述符中可配置的最大版本数public Get setMaxVersions(int maxVersions) throws IOException把要取出的最大版本数设为maxVersions

Delete类
Delete类位于org.apache.hadoop.hbase.client包,主要用来删除列族或列。
1)Delete类构造方法。

public Delete(byte[] row)

按指定行键row创建Delete对象。

public Delete(byte[] row, long timestamp)

按指定行键row和时间戳timestamp创建Delete对象。

public Delete(byte[] rowArray, int rowOffset, int rowLength)

按指定行键创建Delete对象。

public Delete(byte[] rowArray, int rowOffset, int rowLength, long ts)

按指定行键和时间戳ts创建Delete对象。
2)Delete类常用方法

方法名说明public Delete addFamily(byte[] family)删除指定列族的所有列、所有版本public Delete addFamilyVersion(byte[] family, long timestamp)删除指定的列族,且时间戳等于timestamp的版本public Delete addFamily(byte[] family, long timestamp)删除指定列族,其时间戳小于等于timestamp的版本public Delete addColumns(byte[] family, byte[] qualifier)删除指定列族和列限定符所确定的列的最新版本public Delete addColumns(byte[] family, byte[] qualifier, long timestamp)删除由指定列族和列限定符所确定的列,且时间戳等于timestamp的版本public Delete addColumns(byte[] family, byte[] qualifier)删除由指定列族、列限定符所确定的所有版本public Delete addColumns(byte[] family, byte[] qualifier, long timestamp)删除由指定列族、列限定符所确定的列,且时间戳小于等于timestamp的版本public Delete setTimestamp(long timestamp)设置时间戳

Append类
Appand类位于org.apache.hadoop.hbase.client包,主要用在原有单元格值得基础上追加新值。
1)Append类构造方法

public Append(byte[] row)

按指定的行键row创建Append对象

public Append(byte[] rowArray, int rowOffset, int rowLength)

按指定的行键创建Append对象。

public Append(Append a)

复制构造方法。
2)Append类常用方法。

public Append add(byte[] family, byte[] qualifier, byte[] value)

在指定列族和限定符所确定的列追加值value。

public Append add(Cell cell)

在参数cell指定的位置追加值。
Scan类
Scan类属于org.apache.hadoop.hbase.client包,用来限定查询的数据,限定条件有版本号、起始行号、终止行号、列族等。
1)Scan类构造方法

public Scan(byte[] startRow)

创建Scan对象,指定开始行

public Scan(byte[] startRow, byte[] stopRow)

创建Scan对象,指定开始行和结束行

public Scan(byte[] startRow, Filter filter)

创建Scan对象,指定开始行和过滤条件

public Scan(Get get)

创建与get相同规格的Scan对象
2)Scan类常用方法。

方法名说明public Scan addFamily(byte[] family)指定要查询的列族public Scan addColumn(byte[] family, byte[] qualifier)指定要查询的列族和列限定符指定的列public Scan setMaxVersions()指定每一列获取所有版本public Scan setMaxVersions(int maxVersions)指定每一列的最大版本数public Scan setTimeStamp(long timestamp) throws IOException查询指定时间戳的值public Scan setTimeRange(long minStamp, long maxStamp) throws IOException查询指定时间戳范围的值public Scan setStartRow(byte[] startRow)指定查询的开始行,startRow闭区间public Scan setStopRow(byte[] stopRow)指定查询的结束行,stopRow开区间

Result类
Result类位于org.apache.hadoop.hbase.client包,主要用来存放Get和Scan *** 作后的结果,以键值对的形式存放在Map结构中。

方法名说明public byte[] getValue(byte[] family, byte[] qualifier)返回列族family包含的列限定符qualifier确定的列的最新版本public Cell getColumnLatestCell(byte[] family, byte[] qualifier)返回列族family和列限定符qualifier确定的列的最新版本public List getColumnCells(byte[] family, byte[] qualifier)返回列族family和列限定符qualifier确定的列的所有单元格public NavigableMap getFamilyMap(byte[] family)返回列族family包含的列限定符qualifier和值value组成的形式的Mappublic boolean containsColumn(byte[] family, byte[] qualifier)判断是否包含由列族和列限定符所确定的列public boolean containsEmptyColumn(byte[] family, byte[] qualifier)判断是否包含由列族和列限定符所确定的空列

ResultScanner接口
ResultScanner接口位于org.apache.hadoop.hbase.client包,ResultScanner的主要方法

方法名说明public void close() throws IOException关闭scanner并释放给它的资源Result next() throws IOException取得下一行的值,返回Result实例Result[] next(int var1) throws IOException取得nbRows行的值,返回Result数组

Table接口
Table接口位于org.apache.hadoop.hbase.client包,主要用来和Hbase表进行交互,可用于从表中获取、插入、删除或扫描数据。
Table实例通过Connection实例的getTable(TableName tableName)方法获得。

方法说明TableName getName()获取表的全名public void close() throws IOException释放内部缓冲区中持有或挂起的资源,并将变化的数据更新到Tablevoid put(Put var1) throws IOException向表中添加值void put(List var1) throws IOException将List中的Put成批添加到表中void delete(Delete var1) throws IOException删除指定的行或单元格void delete(List var1) throws IOException批量删除List中的Delete对象boolean exists(Get var1) throws IOException返回Get对象指定的列是否存在Result get(Get var1) throws IOException从指定的行提取单元格数据Result[] get(List var1) throws IOException批量处理List中的Get实例ResultScanner getScanner(byte[] var1) throws IOException获取ResultScanner实例Result append(Append var1) throws IOException按指定行和列进行追加 *** 作,返回追加后的值HTableDescriptor getTableDescriptor() throws IOException获取表的HTableDescriptor实例

HColumnDescriptor类
HColumnDescriptor属于org.apache.hadoop.hbase包,其含有列族的详细信息,例如,最大版本数、最小版本数、压缩算法、块大小、生存期、Bloom过滤器等信息。
1)HColumnDescriptor类构造方法。

public HColumnDescriptor(byte[] familyName)

使用familyName创建HColumnDescriptor对象,其他属性使用默认值

public HColumnDescriptor(String familyName)

使用familyName创建HColumnDescriptor对象,其他属性使用默认值

HColumnDescriptor(HColumnDescriptor desc)

复制构造方法
2)HColumnDescriptor类常用方法

方法名说明public byte[] getName()返回字节数组类型的列族名public String getNameAsString()返回String类型的列族名public HColumnDescriptor setMaxVersions(int maxVersions)设置最大版本数public int getMaxVersions()取得最大版本数public HColumnDescriptor setMinVersions(int minVersions)设置最小版本数public int getMinVersions()取得最小版本数

HTableDescriptor类
HTableDescriptor类属于org.apache.hadoop.hbase包
构造方法
HTableDescriptor(TableName name)
其作用是根据给定的表名创建实例

方法说明public HTableDescriptor addFamily(HColumnDescriptor family)增加列族public Collection getFamilies()以集合的形式返回表中所有的列族public HColumnDescriptor[] getColumnFamilies()以数组的形式返回表中所有的列族public HColumnDescriptor getFamily(byte[] column)返回参数column指定的列族public HTableDescriptor setRegionReplication(int regionReplication)设置region的副本数public TableName getTableName()返回表名public HTableDescriptor setValue(String key, String value)设置元数据键值对public String getValue(String key)返回元数据中键对应的值

Admin接口
Admin接口位于org.apache.hadoop.hbase.client包

方法说明void addColumn(TableName var1, HColumnDescriptor var2)向一个已存在的表中添加列void deleteColumn(TableName var1, byte[] var2)从表中删除列void createTable(HTableDescriptor var1)创建表void deleteTable(TableName var1)删除表HTableDescriptor getTableDescriptor(TableName var1)取得指定表的HtableDescriptorHTableDescriptor[] listTables()列出所有的表boolean tableExists(TableName var1)检查表是否存在 Hbase Java开发过程

下面以一个简单的程序为例,说明Hbase Java程序的开发过程
新建Java Project

添加依赖



    4.0.0

    com.example
    myHbase
    1.0-SNAPSHOT
    
        
            org.apache.hbase
            hbase-server
            1.2.6
        
        
            org.apache.hbase
            hbase-client
            1.2.6
        
    


将hbase和hadoop配置文件放入资源目录
进入集群将配置文件夹中的hdfs-site.xml、core-site.xml、hbase-site.xml放入IDEA项目资源目录。

创建log4j.properties配置文件

# priority  :debug> Method: %l ]%n%p:%m%n
#debug log
log4j.logger.debug=debug
log4j.appender.debug=org.apache.log4j.DailyRollingFileAppender 
log4j.appender.debug.DatePattern='_'yyyy-MM-dd'.log'
log4j.appender.debug.File=./src/com/hp/log/debug.log
log4j.appender.debug.Append=true
log4j.appender.debug.Threshold=DEBUG
log4j.appender.debug.layout=org.apache.log4j.PatternLayout 
log4j.appender.debug.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss a} [Thread: %t][ Class:%c >> Method: %l ]%n%p:%m%n
#warn log
log4j.logger.warn=warn
log4j.appender.warn=org.apache.log4j.DailyRollingFileAppender 
log4j.appender.warn.DatePattern='_'yyyy-MM-dd'.log'
log4j.appender.warn.File=./src/com/hp/log/warn.log
log4j.appender.warn.Append=true
log4j.appender.warn.Threshold=WARN
log4j.appender.warn.layout=org.apache.log4j.PatternLayout 
log4j.appender.warn.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss a} [Thread: %t][ Class:%c >> Method: %l ]%n%p:%m%n
#error
log4j.logger.error=error
log4j.appender.error = org.apache.log4j.DailyRollingFileAppender
log4j.appender.error.DatePattern='_'yyyy-MM-dd'.log'
log4j.appender.error.File = ./src/com/hp/log/error.log 
log4j.appender.error.Append = true
log4j.appender.error.Threshold = ERROR 
log4j.appender.error.layout = org.apache.log4j.PatternLayout
log4j.appender.error.layout.ConversionPattern = %d{yyyy-MM-dd HH:mm:ss a} [Thread: %t][ Class:%c >> Method: %l ]%n%p:%m%n

新建class类
在src/main/java下创建com.example包,并在包下创建一个HbaseTest类。

package com.example;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HbaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;

import java.io.IOException;

public class HbaseTest {
    Connection connection=null;
    Admin admin=null;
    public Boolean init(){  //初始化代码,读取配置文件,完成连接,获取Admin对象
        Configuration conf= HbaseConfiguration.create();
        // conf.set("hbase.rootdir","hdfs://localhost:9000/hbase");
        try{
            this.connection= ConnectionFactory.createConnection(conf);
        } catch (IOException e) {
            e.printStackTrace();
            return false;
        }
        try {
            this.admin=this.connection.getAdmin();
        } catch (IOException e) {
            e.printStackTrace();
            try {
                this.connection.close();
            } catch (IOException e1) {
                e1.printStackTrace();
                return false;
            }
        }
        return true;
    }
    
    public void close(){
        if(admin!=null){
            try{
                admin.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        if(connection!=null){
            try{
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    
    public static void main(String[] args){
        HbaseTest test=new HbaseTest();
        if(! test.init())return;
        try{
            System.out.println(test.admin.tableExists(TableName.valueOf("student")));
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            test.close();
        }
    }
}

运行HbaseTest
运行前需要启动集群,并保证hadoop和hbase运行正常。

编程实例

显示表及其列族
showAllTable.java

package com.example;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HbaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;

import java.io.IOException;

public class ShowAllTable extends HbaseTest{
    public void showAllTable() throws IOException {
        HTableDescriptor []htables=admin.listTables();
        for(HTableDescriptor hdes:htables){
            HColumnDescriptor []cdeses=hdes.getColumnFamilies();
            System.out.println(" ==================== ");
            System.out.println("table:"+hdes.getNameAsString()+"tColumn Families");
            System.out.println(" ==================== ");
            for(HColumnDescriptor cdes:cdeses){
                System.out.println(cdes.getNameAsString());
            }
        }
    }

    public static void main(String []args){
        ShowAllTable showAllTable=new ShowAllTable();
        if(!showAllTable.init())return;
        try{
            showAllTable.showAllTable();
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            showAllTable.close();
        }
    }
}


表的创建和删除

package com.example;

import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;

import java.io.IOException;

public class OperationTable extends HbaseTest {
    private void createTable(String myTableName, String[] colFamily) throws IOException {
        TableName tableName=TableName.valueOf(myTableName);
        if(admin.tableExists(tableName)){
            System.out.println("table is exists");
        }else {
            HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName);
            for (String str : colFamily) {
                HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(str);
                hColumnDescriptor.setMaxVersions(5);
                hTableDescriptor.addFamily(hColumnDescriptor);
            }
            admin.createTable(hTableDescriptor);
            System.out.println("create table success!");
        }
    }
    public static void main(String []args){
        OperationTable operationTable=new OperationTable();
        String[] colFamily={"name","id","test"};
        if(!operationTable.init())return;
        try{
            operationTable.createTable("test:HbaseTest",colFamily);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}



删除表

    private void deleteTable(String tableName) throws IOException {
        TableName tabName=TableName.valueOf(tableName);
        if(admin.tableExists(tabName)){
            admin.disableTable(tabName);
            admin.deleteTable(tabName);
        }
    }


增加数据
1)编写增加数据insertCell方法
参数依次为表名、行键、列族、列限定符和值。

	public void insertCell(String tableName, String rowKey, String colFamily, String col, String val) throws IOException {
        Table table=connection.getTable(TableName.valueOf(tableName));
        Put put=new Put(rowKey.getBytes());
        put.addColumn(colFamily.getBytes(),col.getBytes(),val.getBytes());
        table.put(put);
        table.close();
    }


重载insertCell
参数依次为表名、行键、列族、列限定符、时间戳和值。

    public void insertCell(String tableName, String rowKey, String colFamily, String col,long ts, String val) throws IOException {
        Table table=connection.getTable(TableName.valueOf(tableName));
        Put put=new Put(rowKey.getBytes());
        put.addColumn(colFamily.getBytes(),col.getBytes(),ts,val.getBytes());
        table.put(put);
        table.close();
    }


编写批量增加insertList方法

    public void insertList(String tableName,String[] rowKey,String colFamily,String col,String[] val) throws IOException {
        Table table=connection.getTable(TableName.valueOf(tableName));
        ArrayListlist=new ArrayList();
        for(int i=0;i 


追加数据

    public void appendCell(String tableName, String rowKey, String colFamily, String col, String val) throws IOException {
        Table table=connection.getTable(TableName.valueOf(tableName));
        Append append=new Append(rowKey.getBytes());
        append.add(colFamily.getBytes(),col.getBytes(),val.getBytes());
        table.append(append);
        table.close();
    }


查看数据
1)显示一行的内容

    public void showCell(Result result,boolean flag){
        List cells=result.listCells();
        if(flag) System.out.println("Row Family;QualifiertTimetamptvalue");
        for(Cell cell:cells){
            System.out.println("-----------------------------------");
            System.out.printf("%-20s",new String(cell.getRowArray(),cell.getRowOffset(),cell.getRowLength()));
            System.out.printf("%s:%st",new String(cell.getQualifierArray(),cell.getQualifierOffset(),cell.getQualifierLength()));
            System.out.printf("%15st",cell.getTimestamp());
            System.out.printf("%-20sn",new String(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength()));
        }
    }
    public void getData(String tableName,String rowKey,String colFamily,String col,int maxVersions) throws IOException {
        Table table=connection.getTable(TableName.valueOf(tableName));
        Get get=new Get(rowKey.getBytes());
        if(col.equals(""))
            get.addFamily(colFamily.getBytes());
        else
            get.addColumn(colFamily.getBytes(),col.getBytes());
        if(maxVersions>1){
            get.setMaxVersions(maxVersions);
        }
        Result result=table.get(get);
        showCell(result,true);
        table.close();
    }


2)扫描表中的内容

    public void scanData(String tableName,String colFamily,String col,int maxVersion) throws IOException {
        Table table=connection.getTable(TableName.valueOf(tableName));
        Scan scan=new Scan();
        if(col.equals(""))
            scan.addFamily(colFamily.getBytes());
        else
            scan.addColumn(colFamily.getBytes(),col.getBytes());
        if(maxVersion!=0){
            scan.setMaxVersions(maxVersion);
        }
        ResultScanner rs=table.getScanner(scan);
        boolean flag=true;
        for(Result result:rs){
            showCell(result,flag);
            flag=false;
        }
        rs.close();
        table.close();
    }


删除数据
1)删除列族

    public void deleteFamily(String tableName,String rowKey,String colFamily,int flag,long timeStamp) throws IOException {
        Table table=connection.getTable(TableName.valueOf(tableName));
        Delete delete=new Delete(rowKey.getBytes());
        switch (flag){
            case 0://删除指定列族的所有列和版本
                delete.addFamily(colFamily.getBytes());
                break;
            case 1://删除指定时间戳的版本
                delete.addFamilyVersion(colFamily.getBytes(),timeStamp);
                break;
            case 2://删除小于等于时间戳的版本
                delete.addFamily(colFamily.getBytes(),timeStamp);
        }
        table.delete(delete);
        table.close();
    }

2)删除列

    public void deleteColumn(String tableName, String rowKey, String colFamily, String col, int flag, long timeStamp) throws IOException {
        Table table = connection.getTable(TableName.valueOf(tableName));
        Delete delete = new Delete(rowKey.getBytes());
        switch (flag){
            case 0://删除列的所有版本
                delete.addColumns(colFamily.getBytes(),col.getBytes());
                break;
            case 1://删除指定时间戳的版本
                delete.addColumn(colFamily.getBytes(),col.getBytes(),timeStamp);
                break;
            case 2://删除小于等于时间戳的版本
                delete.addColumns(colFamily.getBytes(),col.getBytes(),timeStamp);
                break;
            case 3://删除列的最新版本
                delete.addColumn(colFamily.getBytes(),col.getBytes());
        }
        table.delete(delete);
        table.close();
    }

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/4002301.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-10-22
下一篇 2022-10-22

发表评论

登录后才能评论

评论列表(0条)

保存