【Greenplum实践】使用GPKafka实现Kafka数据导入Greenplum数据库(扩展安装文件网盘分享)

【Greenplum实践】使用GPKafka实现Kafka数据导入Greenplum数据库(扩展安装文件网盘分享),第1张

【Greenplum实践】使用GPKafka实现Kafka数据导入Greenplum数据库(扩展安装文件网盘分享)

分享资源地址及文件列表:

链接:https://pan.baidu.com/s/1XVTxKLkOYrL4pCZpFfs-Tg 
提取码:sq90

包含文件:

# 命令执行
gpkafka
# 扩展安装
gpss.control
gpss--1.0.sql
# 扩展安装时依赖文件
gpss.so
# GP 的 lib 文件
librdkafka.so.1
1.需求说明

  Kafka 是分布式消息订阅系统,有非常好的横向扩展性,可实时存储海量数据,是很常用的流数据处理中间件。物联网设备采集的数据很多时候就是通过 Kafka 进行处理的。当 Kafka 数据要入库分布式数据库 Greenplum 时,我们就需要考虑入库的效率和可靠性了。
  VMware Tanzu™ Greenplum® Streaming Server 提供了 Loading Kafka Data into Greenplum 的工具,官网描述 You can use the gpsscli or gpkafka load utilities to load Kafka data into Greenplum Database. 这里描述的是使用 gpkafka 导入 Kafka 数据的方法。

2.工具安装 2.1 环境说明

Greenplum 数据库版本为 6.13.0,ZooKeeper 版本为 3.6.0,Kafka 版本为 2.4.1。

# GP 版本信息
Greenplum Binary Version: 'postgres (Greenplum Database) 6.13.0'
# ZK 版本信息
[zk: localhost:2181(CONNECTED) 0] version
ZooKeeper CLI version: 3.6.0
# Kafka 版本信息
INFO Kafka version: 2.4.1 (org.apache.kafka.common.utils.AppInfoParser)
2.2 工具安装

首先安装 gpss-gpdb6-1.5.3-rhel7-x86_64.gppkg 文件:

# 在 GP 的安装目录进行安装
[gpadmin@tcloud greenplum-db-6.13.0]$ bin/gppkg -i /home/gpkafka/gpss-gpdb6-1.5.3-rhel7-x86_64.gppkg

# 安装成功
==========================================================================
GPSS installation is complete! To proceed, create gpss extension in the
target database with:
    "CREATE EXTENSION gpss;"
==========================================================================
20220120:08:45:44:008599 gppkg:tcloud:gpadmin-[INFO]:-gpss-gpdb6-1.5.3-rhel7-x86_64.gppkg successfully installed.

如果报错:

Traceback (most recent call last):
  File "bin/gppkg", line 7, in 
    from gppylib.mainUtils import *
importError: No module named gppylib.mainUtils

问题解决【特别说明:路径地址跟配置的数据地址要一致】GP安装教程可参考《Greenplum单机版部署》:

source /usr/local/greenplum-db/greenplum_path.sh
source /home/greenplum/.bashrc
source /home/gpadmin/.bash_profile

特别注意:扩展的安装只在当前数据库生效。

[gpadmin@tcloud ~]$ psql
psql (9.4.24)
Type "help" for help.
# 切换数据库
gpdb=# c datacenter
# 安装 gpss
datacenter=# CREATE EXTENSION gpss;

报错 1️⃣

ERROR:  could not open extension control file 
"/usr/local/greenplum-db-6.13.0/share/postgresql/extension/gpss.control": 
No such file or directory

在报错文件夹/usr/local/greenplum-db-6.13.0/share/postgresql/extension/下添加两个文件 gpss.control 和 gpss--1.0.sql。

# 添加 gpss.control 和 gpss--1.0.sql 后重新安装
datacenter=# CREATE EXTENSION gpss;

报错 2️⃣

ERROR:  could not access file "$libdir/gpfmt_gpss.so": No such file or directory

在文件夹/usr/local/greenplum-db-6.13.0/lib/postgresql下添加三个文件 gpfmt_gpss.so 、gpfmt_protobuf.so 和 gpss.so 这三个文件在 gpss--1.0.sql 内用到了。

# 添加三个文件 gpfmt_gpss.so、gpfmt_protobuf.so 和 gpss.so 后重新安装
datacenter=# CREATE EXTENSION gpss;
CREATE EXTENSION

安装成功验证:

说明: 直接按照会有以上两个报错,安装前可将缺失的文件放到指定文件。

3.实际使用 3.1 入库表

在 Greenplum 数据库创建一个简单的信息表用于测试:

-- 建表
CREATE TABLE "student_info" ( 
"id" VARCHAR ( 32 ), 
"name" VARCHAR ( 64 ), 
"gender" VARCHAR ( 8 ), 
"phone" VARCHAR ( 16 ), 
"age" int2 
) DISTRIBUTED BY ( ID );
-- 备注信息
COMMENT ON TABLE "student_info" IS '学生信息表';
COMMENT ON COLUMN "student_info"."id" IS '记录的唯一标识';
COMMENT ON COLUMN "student_info"."name" IS '姓名';
COMMENT ON COLUMN "student_info"."gender" IS '性别';
COMMENT ON COLUMN "student_info"."phone" IS '手机号';
COMMENT ON COLUMN "student_info"."age" IS '年龄';
3.2 配置文件

GPKafka 使用的配置文件student-datacenter.yaml,类似于 Flume 的 Source、Sink,和字段映射关系,内容如下:

# 目标数据库配置信息
DATAbase: datacenter
USER: gpadmin
PASSWORD: tcloud@2021
HOST: tcloud
PORT: 2345
# Kafka相关配置
KAFKA:
  INPUT:
    SOURCE:
      BROKERS: aliyun:9092
      TOPIC: student_topic
    COLUMNS:
      - NAME: jdata
        TYPE: json
    FORMAT: json
    # 遇到错误退出的次数
    ERROR_LIMIT: 10
  OUTPUT:
    SCHEMA: public
    # 写入数据表
    TABLE: student_info
    # 字段配置( NAME 是数据库字典名 expression 对应的是 Kafka 字段名)
    MAPPING:
      - NAME: id
        expression: (jdata->>'ID')::varchar
      - NAME: name
        expression: (jdata->>'NAME')::varchar
      - NAME: gender
        expression: (jdata->>'GENDER')::varchar
      - NAME: phone
        expression: (jdata->>'PHONE')::varchar
      - NAME: age
        expression: (jdata->>'AGE')::int
  COMMIT:
    # 一次最多提交数量
    MAX_ROW: 10000
    # 等待多少时间一提交(毫秒)
    MINIMAL_INTERVAL: 20 
3.3 Kafka 生产数据

创建主题 student_topic 并启动生产者发送数据:

# 创建主题
bin/kafka-topics.sh --create 
          --bootstrap-server aliyun:9092 
          --replication-factor 1 
          --partitions 1 
          --topic student_topic
          
bin/kafka-topics.sh --create 
          --zookeeper aliyun:2181 
          --replication-factor 1 
          --partitions 1 
          --topic student_topic

# 创建生产者
kafka-console-producer.sh --broker-list aliyun:9092 --topic student_topic

测试数据:

{"ID":"0001","NAME":"Tom","GENDER":"male","PHONE":"iPhone X","AGE":35}
{"ID":"0002","NAME":"Dulcey","GENDER":"female","PHONE":"iPhone 11","AGE":29}
{"ID":"0003","NAME":"Cherryl","GENDER":"female","PHONE":"xiaomi 6","AGE":28}
{"ID":"0004","NAME":"Gertie","GENDER":"female","PHONE":"huawei mate 30","AGE":34}
{"ID":"0005","NAME":"Allen","GENDER":"male","PHONE":"honor magic 3","AGE":27}
{"ID":"0006","NAME":"Kayin","GENDER":"female","PHONE":"xiaomi mix2","AGE":26}
{"ID":"0007","NAME":"Catia","GENDER":"male","PHONE":"iPhone Xs","AGE":25}
{"ID":"0008","NAME":"Elise","GENDER":"female","PHONE":"oppo findx3","AGE":24}
{"ID":"0009","NAME":"Bessie","GENDER":"female","PHONE":"meizu 18s","AGE":23}
{"ID":"0010","NAME":"Russ","GENDER":"male","PHONE":"iqoo 8 pro","AGE":33}
3.4 GPKafka 工具启动

在 student-datacenter.yaml 所在文件夹执行以下命令,gpadmin 是 Greenplum 的用户名称。

nohup gpkafka load student-datacenter.yaml 
	--gpfdist-port=32081 > nohup-student-datacenter.out 2>&1 &

报错 1️⃣

error while loading shared libraries: 
librdkafka.so.1: 
cannot open shared object file: 
No such file or directory

在 GP 的安装目录下的 /lib 内放置 librdkafka.so.1 文件,并添加 .so 的配置:

# 查看 ld.so.conf 包含的路径
cat /etc/ld.so.conf
# 在 ld.so.conf.d 内添加 librdkafka.so.1 的路径
# 生效
/etc/ldconfig
4.总结

GPKafka工具是好用的,但是安装并不容易,很多坑  而且稳定性也不算好。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5711138.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-18
下一篇 2022-12-18

发表评论

登录后才能评论

评论列表(0条)

保存