大数据学习教程SD版第十六篇【Hbase】

大数据学习教程SD版第十六篇【Hbase】,第1张

数据学习教程SD版第十六篇【Hbase】

Hbase 列式数据库

分布式 可扩展 基于HDFS的海量数据存储

K-V 存储 ,更像多维Map,稀疏存储

底层全是字节码存储,所以数据没有类型一说

本身不能做数据分析

1. Hbase 简介
  • 逻辑结构

r列 --rf列族 表纵向切分 Store --rs行键 – region表横向切分 切片

  • 物理存储结构

RowKey 行键 --Column Family 列族 --Column Qualifier 列 --TimeStamp 时间戳 --Type *** 作类型 --Value 值

  • 数据模型
  1. Name Space --> Database
  2. Region --> Table
  3. Row --> 一个Rowkey+ 多个列
  4. Column --> Column
  5. Time Stamp --> Version
  6. Ceil --> {rowkey,columnfamily,columnqualifier,timestamp} 最小单元
2. Hbase 简单架构
  1. RegionServer DML
    1. Data:get 、put、delete
    2. Region:split、compact
  2. Master DDL
    1. Table:create 、delete 、alter
    2. RegionServer : 分配 regions 到 每个 RgionServer
3. Hbase 安装

先要启动 HDFS 、ZK ,hbase-default.xml 在hbase-common的jar包下

  1. 下载并解压安装包
  2. 修改配置文件

hbase-env.sh

export JAVA_HOME=/opt/module/jdk1.8.0_144
#export Hbase_MASTER_OPTS="$Hbase_MASTER_OPTS -XX:PermSize=128m -XX:MaxPermSize=128m"
#export Hbase_REGIONSERVER_OPTS="$Hbase_REGIONSERVER_OPTS -XX:PermSize=128m -XX:MaxPermSize=128m"
export Hbase_MANAGES_ZK=false

hbase-site.xml

  
    hbase.rootdir
    hdfs://hadoop102:8020/hbase/
    
    
    hbase.cluster.distributed
    true
  
    
    hbase.zookeeper.quorum
    hadoop102,hadoop103,hadoop104
  
    
    hbase.zookeeper.property.dataDir
    /opt/module/zookeeper-3.5.7/zkData
  

regionservers

hadoop102
hadoop103
hadoop104
  1. 设置hadoop的core,hdfs-site.xml软连接到hbase下
ln -s /opt/module/hadoop-2.7.1/etc/hadoop/core-site.xml /opt/module/hbase-1.3.1/conf/
ln -s /opt/module/hadoop-2.7.1/etc/hadoop/hdfs-site.xml /opt/module/hbase-1.3.1/conf/
  1. 分发配置

  2. 启动

bin/start-hbase.sh
4. Hbase Shell
bin/hbase shell
# DDL
## 1.query
list
describe 'test1'
## 2.create
create 'test1','info'
## 3.update
alter 'test1', {NAME => 'info', VERSIONS => '3'}
## 4.delete
disable 'test1'
drop 'test1'
## ns 删除ns,必须先删除ns下所有table
list_namespace
list_namespace_tables 'default'

# DML
## 1.put
put 'test1','1001','info:id','001'
## 2.get、scan: 获取最新时间戳(版本)的数据
get 'test1','1001'
get 'test1','1001','info'
get 'test1','1001','info:id'
scan 'test1'
scan 'test1',{STARTROW => '1001',STOPROW => '1003'}
## 扫描10个版本内的数据
scan 'test1',{ RAW => true  ,VERSIONS => 10 }
## 3.update:加一条最新时间戳的数据,不会删除原来版本数据
put 'test1','1003','info:name','wangwuwu'
## 4.delete :标记删除
delete 'test1','1001','info:age'
deleteall 'test1','1001'
truncate 'test1'

##多 version 存储与查询
alter 'test2',{ NAME => 'info', VERSIONS => 3 }
get 'test2', '1001', {COLUMN => 'info:name',VERSIONS => 3}

COLUMN                                    CELL
 info:name                                timestamp=1640829889032, value=ccc
 info:name                                timestamp=1640829885058, value=bbb
 info:name                                timestamp=1640829878870, value=aaa
 
 # flush -> hdfs
 flush 'test2'
5. Hbase 详细架构

  • 三大组件
  1. HDFS

    • HDFS Client 把StoreFile 存储在DataNode
    • HDFS DataNode 实际存储数据
  2. Hbase

  • HMaster 元数据管理,RegionServer集群管理
  • HRegionServer
    • HLog 预写日志,记录数据 *** 作
    • HRegion 表级数据,表的切片
      • Store 列族级别数据
        • MemoStore 内存数据
        • StoreFile 磁盘数据
          • HFile 存储格式K-V
  1. ZK
    • 与Client交互,对接数据的 *** 作
    • 与HMater交互 ,存储元数据位置
6. Hbase 读写流程

读比写慢

6.1 Hbase 写数据

如果缓存中有对应信息的话,直接进行写 *** 作即可

  1. Client 向ZK 请求 meta表 位置
  2. Client 获取到meta 所在节点,请求meta 信息,数据存在哪个节点,写入缓存
  3. Client 获取到 put数据 所在节点,请求put数据
  4. put 数据的节点 把数据先写入WAL和MemoStore,并返回ack
6.2 Hbase Flush

从 memostore 刷写到 Storefile,刷写时机

  1. Regionserver大小级别:默认 headp 0.4 , 0.95
        hbase.regionserver.global.memstore.size  
         
        hbase.regionserver.global.memstore.size.lower.limit  
          
  1. Region大小级别:默认128M
        hbase.hregion.memstore.flush.size  
        134217728  
  1. 时间级别:默认1h
        hbase.regionserver.optionalcacheflushinterval  
        3600000  
6.3 Hbase 读数据
  1. Clinet 向 ZK 请求 meta 位置
  2. Client 向meta 所在节点请求 数据所在节点
  3. Client 向数据所在节点请求 读 *** 作
  4. 读的是memostore+storefile 数据,只不过会把storefile 加载进block cache,与memostore 数据比较时间戳,返回最大时间戳的数据
6.4 Hbase Compact

Hfile 文件合并

  1. Minor Compactions 大量小文件合并,不会删除小版本数据
  2. Major Compactions 大文件合并,会删除数据 默认7天
6.5 Hbase Split

Hbase 自己切分会导致数据倾斜 64M -> 256M -> …… 10G 官方建议一个列族,防止出现小文件

7. Hbase API 7.1 DDL API
package com.ipinyou.hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;

import java.io.IOException;

public class APITest {

    private static Connection conn;
    private static Admin admin;

    static {
        Configuration conf = HbaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104");
        try {
            conn = ConnectionFactory.createConnection(conf);
        } catch (IOException e) {
            e.printStackTrace();
        }
        try {
            admin = conn.getAdmin();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    
    public static boolean isTableExists(String tableName) throws IOException {
        boolean exists = admin.tableExists(TableName.valueOf(tableName));
        return exists;
    }

    
    public static void createTable(String tableName, String... family) throws IOException {

        if (isTableExists(tableName)) {
            return;
        }

        HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
        for (String f : family) {
            HColumnDescriptor familyDesc = new HColumnDescriptor(f);
            tableDesc.addFamily(familyDesc);
        }
        admin.createTable(tableDesc);
    }

    
    public static void dropTable(String tableName) throws IOException {
        if (!isTableExists(tableName)) {
            return;
        }
        admin.disableTable(TableName.valueOf(tableName));
        admin.deleteTable(TableName.valueOf(tableName));
    }

    
    public static void createNameSpace(String ns) {
        NamespaceDescriptor nsDesc = NamespaceDescriptor.create(ns).build();
        try {
            admin.createNamespace(nsDesc);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }


    public static void main(String[] args) throws IOException {
        System.out.println(isTableExists("bigdata:test4"));
//        createTable("bigdata:test4", "info1", "info2");
//        dropTable("test4");
//        createNameSpace("bigdata");
        System.out.println(isTableExists("bigdata:test4"));

    }

}
7.2 DML API

关于删除级别与删除标记:

  1. RowKey级别 DeleteFamily
  2. Rowkey+CF级别 DF
  3. RowKey+CF+CN级别 DeleteColumn
package com.ipinyou.hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HbaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;

public class APITest2 {

    private static Connection conn;

    static {
        Configuration conf = HbaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104");
        try {
            conn = ConnectionFactory.createConnection(conf);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    
    public static void putData(String tableName, String rowKey, String family, String column, String value) throws IOException {
        Table table = conn.getTable(TableName.valueOf(tableName));

        Put put = new Put(Bytes.toBytes(rowKey));
        put.addColumn(Bytes.toBytes(family), Bytes.toBytes(column), Bytes.toBytes(value));

        table.put(put);
        table.close();
    }

    
    public static void getData(String tableName, String rowKey, String family, String column) throws IOException {
        Table table = conn.getTable(TableName.valueOf(tableName));
        Get get = new Get(Bytes.toBytes(rowKey));
//        get.addFamily(Bytes.toBytes(family));
        get.addColumn(Bytes.toBytes(family), Bytes.toBytes(column));
        Result result = table.get(get);
        Cell[] cells = result.rawCells();
        for (Cell cell : cells) {
            String cf = Bytes.toString(CellUtil.cloneFamily(cell));
            String c = Bytes.toString(CellUtil.cloneQualifier(cell));
            String v = Bytes.toString(CellUtil.clonevalue(cell));
            System.out.println("Family:" + cf + " Column:" + c + " Value:" + v);
        }
    }

    
    public static void scanData(String tableName) throws IOException {
        Table table = conn.getTable(TableName.valueOf(tableName));
        Scan scan = new Scan();
        ResultScanner resultScanner = table.getScanner(scan);
        for (Result result : resultScanner) {
            Cell[] cells = result.rawCells();
            for (Cell cell : cells) {
                String row = Bytes.toString(CellUtil.cloneRow(cell));
                String cf = Bytes.toString(CellUtil.cloneFamily(cell));
                String c = Bytes.toString(CellUtil.cloneQualifier(cell));
                String v = Bytes.toString(CellUtil.clonevalue(cell));
                System.out.println("Rowkey:" + row + " Family:" + cf + " Column:" + c + " Value:" + v);
            }
        }
    }

    
    public static void deleteData(String tableName, String rowKey, String cf, String column) throws IOException {
        Table table = conn.getTable(TableName.valueOf(tableName));
        Delete delete = new Delete(Bytes.toBytes(rowKey));

        table.delete(delete);
    }


    public static void main(String[] args) throws IOException {
//        putData("bigdata:test4", "1001", "info1", "age", "18");
//        getData("bigdata:test4", "1001", "info1", "name");
        scanData("bigdata:test4");
//        deleteData("bigdata:test4", "1003", "", "");
//        scanData("bigdata:test4");
    }

}
8. Hbase MR

Hbase 借助 MapReduce 来实现数据分析

# 1.准备工作

# 1.1 在 hadoop-env.sh hadoop_classpath for循环下面添加
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/opt/module/hbase-1.3.1/lib/*
# 1.2 加一个参数变量
export Hbase_HOME=/opt/module/hbase-1.3.1

# 重启集群
  1. 统计表行数(官方自带)
yarn jar lib/hbase-server-1.3.1.jar rowcounter {tablename}
  1. 导入数据到Hbase Table(官方自带)
# 需要创建table
create 'fruit','info'

yarn jar lib/hbase-server-1.3.1.jar importtsv -Dimporttsv.columns=Hbase_ROW_KEY,info:name,info:color fruit hdfs://hadoop102:8020/test.tsv
  1. 当然也可以自定义MapReduce程序
# 按照官方文档实例编写
https://hbase.apache.org/book.html#mapreduce
9. Hbase Hive

Hbase 借助于Hive来实现数据分析

# 1.copy hbase jar -> hive 
# 2. 把zk信息配置到hive-site.xml中

 
    hive.zookeeper.quorum
    hadoop102,hadoop103,hadoop104
  
  
    hive.zookeeper.client.port
    2181
  

# 3. 在hive中创建hbase关联的表

create table hive_hbase_test(id string,name string, age int) 
stored by 'org.apache.hadoop.hive.hbase.HbaseStorageHandler' 
with serdeproperties("hbase.columns.mapping" = ":key,cf:name,cf:age")
tblproperties ("hbase.table.name" = "hive_hbase_test");

# 4.在hive表中进行查询分析即可
10. Hbase 存储优化

Hbase 自带高可用,后续启动的Mater后自动变成Standby状态

可以在conf下新建 backup-masters文件,写入hadoop103 hadoop104,会在启动时启动三个Master,两个处于备份状态

10.1 建表预分区

按照字典排序和比较

# 分5个区:根据数据量适当分
create 't1', 'f1', SPLITS => ['10', '20', '30', '40']
10.2 Rowkey设计

设计Rowkey 尽量均匀散列分布到每个预分区中

  • 散列Rowkey
    1. 随机数、hash
    2. 反转、拼接(加盐)
10.3 基础参数
  1. Hstore大小,默认10G
  2. flush,compact,split大小
    eHandler’
    with serdeproperties(“hbase.columns.mapping” = “:key,cf:name,cf:age”)
    tblproperties (“hbase.table.name” = “hive_hbase_test”);

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5699086.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存