我们在搞一个大数据平台的时候,数据从哪些来?一般也无非以下几个来源:
1)自己的业务系统,可能是MySQL或其他各种DB;
2)埋点;
3)爬虫;
4)其他数据源,如比你买了鲸准数据,海鹰数据等等;
这么多的数据怎么进入我们的数仓(Hive/Hbase/ClickHouse......)这个时候我们就需要CDC了。可以理解为数据抽取?数据实时同步(增量、全量)?都可以。随你。
差点漏了,CDC也有其他的使用场景,比如:
业务数据发展到一定水平,需要将大部分冷热数据从熟悉的DB迁移到其他存储进行复杂查询和分析
分库分表后,某些报表类查询无法工作,需要汇总到单库表进行 *** 作
分库分表有多个维度,需要拷贝多份数据达成冗余
通过伪数据共享(没办法引入MQ、无法共享库表)进行业务改造
慢存储→Cache之间的同步
不停服数据迁移/scheme变更
关于数据同步,只是一个引子,可以深入的学习系统的资料或技术。
1、CDC技术选型debezium :https://debezium.io/documentation/reference/1.0/connectors/mysql.html
kafka: http://kafka.apache.org/
zookeeper : http://zookeeper.apache.org/
CDC的组件有很多,比如:maxwell、canal、debezium、flinkx等等,还是比较多的。组件之间的对比,有需要可以查看其他文章,大神们都写的很清楚。这里,我结合自己的业务,选择了debezium(Debezium)文档很全,但是英文的。debezium的工作原理在这里略过,可以看官网,但我们必须要知道两件事情:
Debezium是基于kafka connector 集成开发的与kafka 高度耦合kafka 1.0 以后 connector的发展很快,confluent 有很多开源的connector 2、接下来我们就说一下从0到 1搭建一套CDC,并让他跑起来。 2.1、准备好机器,建议三台,如下:
三台机器上都要安装JDK,略过。
设置三台机器的名称,并把IP与名称映射起来,每台机器都要配置,如下:
vim /etc/hosts 192.168.1.1 hadoopMaster 192.168.1.2 hadoopSlave0 192.168.1.3 hadoopSlave1 #重启一下虚拟机 reboot
让hadoopMaster节点可以SSH无密码登录到各个hadoopSlave节点上
# 当前在hadoopMaster机器上哦 1)ssh-keygen -t rsa #生成ssh密钥,不提示输入密码 2)ssh-copy-id hadoopMaster 3)ssh-copy-id hadoopSlave0 4)ssh-copy-id hadoopSlave1 #将密钥拷贝到各节点 5)ssh hadoopSlave1 #测试免密登录2.2、准备完这些后,就可以从zookeeper集群开始了
# 安装目录:/usr/local/zookeeper # 下载zk https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.6.3/ 注意:需要下载:apache-zookeeper-3.6.3-bin.tar.gz # 解压 tar -zxvf apache-zookeeper-3.6.3-bin.tar.gz # /usr/local/zookeeper下创建zkdata目录 # 进入zkdata vi zkData/myid ## hadoopMaster 1; hadoopSlave0 2; hadoopSlave1 3; 对应的机器只填对应的一个数字,啥也没有,包括空格 # 进入 /usr/local/zookeeper/apache-zookeeper-3.6.3-bin/conf # 复制配置文件模板 cp zoo_sample.cfg zoo.cfg # 修改此参数 dataDir=/opt/app/zookeeper/zkData # 新增下面的内容 server.1=hadoopMaster:3181:4181 server.2=hadoopSlave0:3181:4181 server.3=hadoopSlave1:3181:4181 # 3181表示follow与leader之间的通信端口 # 4181表示选举端口 # 默认2181是客户端访问端口,不用修改 # clientPort=2181 # 从Master上分发配置到其他节点 #将zookeeper目录分发到其他节点的相同目录下: scp -r /usr/local/zookeeper/apache-zookeeper-3.6.3-bin root@hadoopSlave0:/usr/local/zookeeper/ # 启动集群(每个节点都需要进行启动 *** 作) # 进入 /usr/local/zookeeper/apache-zookeeper-3.6.3-bin/bin ./zkServer.sh start # 查看集群状态 ./zkServer.sh status2.3、至此,ZK可以正常运行了。然后我们开始搭建Kafka集群。
# 依赖JDK和Zookeeper环境已提前安装 # 下载地址:http://kafka.apache.org/downloads # 我们安装的版本:kafka_2.12-2.6.0.tgz ,安装目录:/usr/local/kafka # 解压下载的安装包 tar -zxvf kafka_2.12-2.6.0.tgz # 修改配置文件 # 进入 /usr/local/kafka/kafka_2.12-2.6.0 vim config/server.properties # 修改下面的内容 # kafka节点id hadoopSlave0 = 2 hadoopSlave1 = 3 broker.id=1 # 删除topic时,是否物理删除,否则只是逻辑删除 delete.topic.enable=true # 实际存放topic数据的目录,默认kafka的日志存放在 kafka/logs 下,这里数据和日志最好分开存放 log.dirs=/usr/local/kafka/kafka_2.12-2.6.0/data # 指定我们准备好的zk集群 zookeeper.connect=hadoopMaster:2181,hadoopSlave0:2181,hadoopSlave1:2181 # 分发kafka目录到其他节点 scp -r /usr/local/kafka/kafka_2.12-2.6.0 root@hadoopSlave0:/usr/local/kafka # 启动kafka服务 cd /usr/local/kafka/kafka_2.12-2.6.0/bin # 3个节点都需要一一启动 ./kafka-server-start.sh -daemon /usr/local/kafka/kafka_2.12-2.6.0/config/server.properties2.4、给kafka配一个web版的管理页面或仪表盘,管理起来更加方便
源码: GitHub - smartloli/EFAK: A easy and high-performance monitoring system, for comprehensive monitoring and management of kafka cluster.
官网:EFAK
下载: Download - EFAK
安装文档: 2.Env & Install - Kafka Eagle
下载:kafka-eagle-bin-2.0.7.tar.gz 解压后会有:fak-web-2.0.7-bin.tar.gz
安装目录:/usr/local/kafka/kafka-eagle 解压完的路径是:/usr/local/kafka/kafka-eagle/kafka-eagle-bin-2.0.7/efak-web-2.0.7
设置Kafka-Eagle目录
vim /etc/profile export KE_HOME= /usr/local/kafka/kafka-eagle PATH=$PATH:$KE_HOME/bin
设置java目录
vim /etc/profile export JAVA_HOME= /usr/local/java/
更新环境变量
source /etc/profile
修改Kafka-Eagle配置文件
cd /usr/local/kafka/kafka-eagle/kafka-eagle-bin-2.0.7/efak-web-2.0.7/conf vim system-config.properties # multi zookeeper&kafka cluster list # zookeeper和kafka集群配置 # kafka.eagle 支持管理多个kafka集群 kafka.eagle.zk.cluster.alias=cluster1 cluster1.zk.list=hadoopMaster:2181,hadoopSlave0:2181,hadoopSlave1:2181 # kafka eagle webui port # web页面访问端口号 kafka.eagle.webui.port=8048 # kafka jdbc driver address # kafka 默认使用sqlite数据库 设置好路径 kafka.eagle.driver=org.sqlite.JDBC kafka.eagle.url=jdbc:sqlite:/usr/local/kafka/kafka-eagle/db/ke.db kafka.eagle.username=root kafka.eagle.password=www.kafka-eagle.org启动kafka-eagle
cd /usr/local/kafka/kafka-eagle/kafka-eagle-bin-2.0.7/efak-web-2.0.7/bin chmod +x ke.sh ./ke.sh start启动kafka-eagle时的部分信息,注意,这里已经给出了默认的账号及密码信息。
[2021-09-16 23:14:02] INFO: Port Progress: [##################################################] | 100% [2021-09-16 23:14:06] INFO: Config Progress: [##################################################] | 100% [2021-09-16 23:14:09] INFO: Startup Progress: [##################################################] | 100% [2021-09-16 23:13:59] INFO: Status Code[0] [2021-09-16 23:13:59] INFO: [Job done!] Welcome to ______ ______ ___ __ __ / ____/ / ____/ / | / //_/ / __/ / /_ / /| | / ,< / /___ / __/ / ___ | / /| | /_____/ /_/ /_/ |_|/_/ |_| ( Eagle For Apache Kafka® ) Version 2.0.7 -- Copyright 2016-2021 ******************************************************************* * EFAK Service has started success. * Welcome, Now you can visit 'http://192.168.11.163:8048' * Account:admin ,Password:123456 ******************************************************************* *kafka-eagle UI界面ke.sh [start|status|stop|restart|stats] *https://www.kafka-eagle.org/ *******************************************************************
首选我们从 debezium 官网 https://debezium.io/releases/1.8/ 下载:debezium-connector-mysql-1.6.2.Final-plugin.tar.gz
记得下载完后放到/usr/local/kafka/kafka_2.12-2.6.0/plugins目录。
KafkaCnnect有两个核心概念还是要了解一下:Source和Sink。 Source负责导入数据到Kafka,Sink负责从Kafka导出数据,它们都被称为Connector。依然有点懵?其他文章可以深入的学习一下。个人认为在后续的开发中还是很用的。
Kafka Connect的启动方式分为单机模式和集群模式,这里我们只关心集群模式。
在distributed模式下,启动不需要传递connector的参数,而是通过REST API来对kafka connect进行管理,包括启动、暂停、重启、恢复和查看状态的 *** 作。
必须知道的几个个topic
1-group.id (topic name默认 connect-cluster):Connect cluster group 使用唯一的名称。
2-config.storage.topic (topic name默认 connect-configs):topic 用于存储 Connector 和任务配置。
3-offset.storage.topic (topic name默认 connect-offsets) :topic 用于存储 offsets。
4-status.storage.topic (默认 connect-status):topic 用于存储状态。
分布式配置文件还是要配一下:找到kafka安装目录:/usr/local/kafka/kafka_2.12-2.6.0/config
# vi connect-distributed.properties bootstrap.servers=hadoopMaster:9092,hadoopSlave0:9092,hadoopSlave1:9092 group.id=connect-cluster key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true offset.storage.topic=connect-offsets offset.storage.replication.factor=1 #offset.storage.partitions=25 config.storage.topic=connect-configs config.storage.replication.factor=1 status.storage.topic=connect-status status.storage.replication.factor=1 #status.storage.partitions=5 offset.flush.interval.ms=10000 #rest.host.name= rest.port=8083 rest.advertised.host.name=hadoopMaster rest.advertised.port=8083 # Debezium 配置 plugin.path=/usr/local/kafka/kafka_2.12-2.6.0/plugins
启动 Kafka Connect (每个节点都要启动:这里是以分布式方式启动的,有别于本地模式哈)
# cd /usr/local/kafka/kafka_2.12-2.6.0/bin ./connect-distributed.sh -daemon ../config/connect-distributed.properties
至此CDC的部署全部完成。那怎么用呢?
3、CDC使用 3.1、Kafka Connect & Debezium 使用入门先加强一下印象:
1-kafka connect 是一个可扩展的、可靠的在kafka和其他系统之间流传输的数据工具。简而言之就是他可以通过Connector(连接器)简单、快速的将大集合数据导入和导出kafka。可以接收整个数据库或收集来自所有的应用程序的消息到kafka的topic中。
2-在任何一个机器上通过curl的方式请求确认服务是否正常:curl localhost:8083/connectors,如果办输出一个空的[],说明一切都OK了;
3-集群模式需要通过 Kafka Connect REST API 去提交connectors,具体的API不一一列出来了;
我们提交一个connectors
{ "name": "test-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "192.168.1.1", "database.port": "3306", "database.user": "root", "database.password": "......", "database.server.id": "202201", "database.server.name": "testdbserver", "database.include.list": "cdc_test_db", "database.history.kafka.bootstrap.servers": "hadoopMaster:9092,hadoopSlave0:9092,hadoopSlave1:9092", "database.history.kafka.topic": "db_history_test", "include.schema.changes": "true", "table.include.list": "t_cdc_test", "database.history.skip.unparseable.ddl": "true" } }
在三台机器的任意一台执行:curl -X POST -H "Content-Type: application/json" localhost:8083/connectors/ -d ‘上面的那小段JSON’ 或都使用Postman提交到“192.168.1.3:8083/connectors”是一样的效果。
提交成功后再执行一下:curl localhost:8083/connectors,就可以看到是这样了:[test-connector]
参数基本上都能看的懂,Debezium官网上有非常详细的说明,请果需要,请移步至Debezium官网查看更多的配置。
如果我们需要同步更多的库、表数据,提交多个connectors就行了,这里提醒一下:默认情况下Debezium是一个表自动生成一个topic,如果topic太多了,建议多张表对应一个topic,具体 *** 作不一一介绍。
最后:深入学习的话至少要做两件事:Kafka Connect REST API、Debezium官网至少过一遍;最后,你可能还需要抽2分钟上想想Kafka Connect与Debezium的关系,如果有精力,看看Debezium的源码那就更牛了。
3.2、Kafka Connect & Debezium & SparkStreaming整合到一起说到Kafka Connect & Debezium & SparkStreaming整合前,我们看看 Debezium 通过Kafka发送的数据长什么样,如下:
{ "schema": { "name": "testdbserver.cdc_test_db.t_cdc_test.Envelope", "optional": false, "type": "struct", "fields": [{ "field": "before", "name": "testdbserver.cdc_test_db.t_cdc_test.Value", "optional": true, "type": "struct", "fields": [{ "field": "Id", "optional": false, "type": "int64" }, { "field": "Name", "optional": false, "type": "string" }] }, { "field": "after", "name": "testdbserver.cdc_test_db.t_cdc_test.Value", "optional": true, "type": "struct", "fields": [{ "field": "Id", "optional": false, "type": "int64" }, { "field": "Name", "optional": false, "type": "string" }] }, { "field": "source", "name": "io.debezium.connector.mysql.Source", "optional": false, "type": "struct", "fields": [{ "field": "version", "optional": false, "type": "string" }, { "field": "connector", "optional": false, "type": "string" }, { "field": "name", "optional": false, "type": "string" }, { "field": "ts_ms", "optional": false, "type": "int64" }, { "default": "false", "field": "snapshot", "name": "io.debezium.data.Enum", "optional": true, "type": "string", "version": 1, "parameters": { "allowed": "true,last,false" } }, { "field": "db", "optional": false, "type": "string" }, { "field": "sequence", "optional": true, "type": "string" }, { "field": "table", "optional": true, "type": "string" }, { "field": "server_id", "optional": false, "type": "int64" }, { "field": "gtid", "optional": true, "type": "string" }, { "field": "file", "optional": false, "type": "string" }, { "field": "pos", "optional": false, "type": "int64" }, { "field": "row", "optional": false, "type": "int32" }, { "field": "thread", "optional": true, "type": "int64" }, { "field": "query", "optional": true, "type": "string" }] }, { "field": "op", "optional": false, "type": "string" }, { "field": "ts_ms", "optional": true, "type": "int64" }, { "field": "transaction", "optional": true, "type": "struct", "fields": [{ "field": "id", "optional": false, "type": "string" }, { "field": "total_order", "optional": false, "type": "int64" }, { "field": "data_collection_order", "optional": false, "type": "int64" }] }] }, "payload": { "op": "c", //c代表创建(或插入),u代表更新,d代表删除,r代表读(在非初始快照的情况下) "after": { "Id": 1, "Name": "1" }, "source": { "server_id": 1, "version": "1.6.2.Final", "file": "mysql-bin.000083", "connector": "mysql", "pos": 1004002502, "name": "testdbserver", "row": 0, "ts_ms": 1641377065000, "snapshot": "false", "db": "cdc_test_db", "table": "t_cdc_test" }
是不是非常的清析?哪个库、哪个表、哪个字段发生了什么样的变化、变化前是什么、变化后是什么等等一应具全。
接下,我们聊聊 SparkStreaming 实时消费Kafka......
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)