Zookeeper & Kafka & Debezium & Spark 实时数据同步方案在实际项目中的使用

Zookeeper & Kafka & Debezium & Spark 实时数据同步方案在实际项目中的使用,第1张

Zookeeper & Kafka & Debezium & Spark 实时数据同步方案在实际项目中的使用

我们在搞一个大数据平台的时候,数据从哪些来?一般也无非以下几个来源:

1)自己的业务系统,可能是MySQL或其他各种DB;

2)埋点;

3)爬虫;

4)其他数据源,如比你买了鲸准数据,海鹰数据等等;

这么多的数据怎么进入我们的数仓(Hive/Hbase/ClickHouse......)这个时候我们就需要CDC了。可以理解为数据抽取?数据实时同步(增量、全量)?都可以。随你。

差点漏了,CDC也有其他的使用场景,比如:

业务数据发展到一定水平,需要将大部分冷热数据从熟悉的DB迁移到其他存储进行复杂查询和分析

分库分表后,某些报表类查询无法工作,需要汇总到单库表进行 *** 作

分库分表有多个维度,需要拷贝多份数据达成冗余

通过伪数据共享(没办法引入MQ、无法共享库表)进行业务改造

慢存储→Cache之间的同步

不停服数据迁移/scheme变更

关于数据同步,只是一个引子,可以深入的学习系统的资料或技术。

1、CDC技术选型

 debezium :https://debezium.io/documentation/reference/1.0/connectors/mysql.html

 kafka: http://kafka.apache.org/

 zookeeper :  http://zookeeper.apache.org/

CDC的组件有很多,比如:maxwell、canal、debezium、flinkx等等,还是比较多的。组件之间的对比,有需要可以查看其他文章,大神们都写的很清楚。这里,我结合自己的业务,选择了debezium(Debezium)文档很全,但是英文的。debezium的工作原理在这里略过,可以看官网,但我们必须要知道两件事情:

Debezium是基于kafka connector 集成开发的与kafka 高度耦合kafka 1.0 以后 connector的发展很快,confluent 有很多开源的connector 2、接下来我们就说一下从0到 1搭建一套CDC,并让他跑起来。 2.1、准备好机器,建议三台,如下: 名称IP用途hadoopMaster192.168.1.1Zookeeper、Kafka、Kafka-EaglehadoopSlave0192.168.1.2Zookeeper、KafkahadoopSlave1192.168.1.3Zookeeper、Kafka

三台机器上都要安装JDK,略过。

设置三台机器的名称,并把IP与名称映射起来,每台机器都要配置,如下:

vim /etc/hosts

192.168.1.1 hadoopMaster
192.168.1.2 hadoopSlave0
192.168.1.3 hadoopSlave1

#重启一下虚拟机
reboot

让hadoopMaster节点可以SSH无密码登录到各个hadoopSlave节点上

# 当前在hadoopMaster机器上哦

1)ssh-keygen  -t  rsa          #生成ssh密钥,不提示输入密码
2)ssh-copy-id  hadoopMaster
3)ssh-copy-id  hadoopSlave0
4)ssh-copy-id  hadoopSlave1    #将密钥拷贝到各节点
5)ssh  hadoopSlave1            #测试免密登录
2.2、准备完这些后,就可以从zookeeper集群开始了
# 安装目录:/usr/local/zookeeper

# 下载zk https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.6.3/ 注意:需要下载:apache-zookeeper-3.6.3-bin.tar.gz

# 解压

    tar -zxvf apache-zookeeper-3.6.3-bin.tar.gz

# /usr/local/zookeeper下创建zkdata目录

# 进入zkdata

    vi zkData/myid
    ## hadoopMaster 1; hadoopSlave0 2; hadoopSlave1 3; 对应的机器只填对应的一个数字,啥也没有,包括空格

# 进入 /usr/local/zookeeper/apache-zookeeper-3.6.3-bin/conf

    # 复制配置文件模板
    cp zoo_sample.cfg zoo.cfg
    # 修改此参数
    dataDir=/opt/app/zookeeper/zkData
    
    # 新增下面的内容
    server.1=hadoopMaster:3181:4181
    server.2=hadoopSlave0:3181:4181
    server.3=hadoopSlave1:3181:4181
    
    # 3181表示follow与leader之间的通信端口
    # 4181表示选举端口
    # 默认2181是客户端访问端口,不用修改
    # clientPort=2181
# 从Master上分发配置到其他节点

    #将zookeeper目录分发到其他节点的相同目录下:
    
    scp -r /usr/local/zookeeper/apache-zookeeper-3.6.3-bin root@hadoopSlave0:/usr/local/zookeeper/

# 启动集群(每个节点都需要进行启动 *** 作)

    # 进入 /usr/local/zookeeper/apache-zookeeper-3.6.3-bin/bin
    ./zkServer.sh start

    # 查看集群状态
    ./zkServer.sh status
2.3、至此,ZK可以正常运行了。然后我们开始搭建Kafka集群。
# 依赖JDK和Zookeeper环境已提前安装

# 下载地址:http://kafka.apache.org/downloads

# 我们安装的版本:kafka_2.12-2.6.0.tgz ,安装目录:/usr/local/kafka

# 解压下载的安装包

    tar -zxvf kafka_2.12-2.6.0.tgz

# 修改配置文件

    # 进入 /usr/local/kafka/kafka_2.12-2.6.0
    vim config/server.properties

    # 修改下面的内容
    # kafka节点id hadoopSlave0 = 2 hadoopSlave1 = 3
    broker.id=1
    
    # 删除topic时,是否物理删除,否则只是逻辑删除
    delete.topic.enable=true
    
    # 实际存放topic数据的目录,默认kafka的日志存放在 kafka/logs 下,这里数据和日志最好分开存放
    log.dirs=/usr/local/kafka/kafka_2.12-2.6.0/data
    
    # 指定我们准备好的zk集群
    zookeeper.connect=hadoopMaster:2181,hadoopSlave0:2181,hadoopSlave1:2181

# 分发kafka目录到其他节点

    scp -r /usr/local/kafka/kafka_2.12-2.6.0 root@hadoopSlave0:/usr/local/kafka

# 启动kafka服务

   cd /usr/local/kafka/kafka_2.12-2.6.0/bin
# 3个节点都需要一一启动
   ./kafka-server-start.sh -daemon /usr/local/kafka/kafka_2.12-2.6.0/config/server.properties
2.4、给kafka配一个web版的管理页面或仪表盘,管理起来更加方便

源码: GitHub - smartloli/EFAK: A easy and high-performance monitoring system, for comprehensive monitoring and management of kafka cluster.

官网:EFAK

下载: Download - EFAK

安装文档: 2.Env & Install - Kafka Eagle

下载:kafka-eagle-bin-2.0.7.tar.gz 解压后会有:fak-web-2.0.7-bin.tar.gz

安装目录:/usr/local/kafka/kafka-eagle 解压完的路径是:/usr/local/kafka/kafka-eagle/kafka-eagle-bin-2.0.7/efak-web-2.0.7

设置Kafka-Eagle目录

vim /etc/profile
export KE_HOME= /usr/local/kafka/kafka-eagle
PATH=$PATH:$KE_HOME/bin

设置java目录

vim /etc/profile export JAVA_HOME= /usr/local/java/

更新环境变量

source /etc/profile

修改Kafka-Eagle配置文件

   cd /usr/local/kafka/kafka-eagle/kafka-eagle-bin-2.0.7/efak-web-2.0.7/conf
    vim system-config.properties
    
    # multi zookeeper&kafka cluster list
    # zookeeper和kafka集群配置
    # kafka.eagle 支持管理多个kafka集群
    
    kafka.eagle.zk.cluster.alias=cluster1
    cluster1.zk.list=hadoopMaster:2181,hadoopSlave0:2181,hadoopSlave1:2181
    
    # kafka eagle webui port 
    # web页面访问端口号
   
    kafka.eagle.webui.port=8048
    
    # kafka jdbc driver address
    # kafka 默认使用sqlite数据库 设置好路径
    
    kafka.eagle.driver=org.sqlite.JDBC
    kafka.eagle.url=jdbc:sqlite:/usr/local/kafka/kafka-eagle/db/ke.db
    kafka.eagle.username=root
    kafka.eagle.password=www.kafka-eagle.org

启动kafka-eagle
cd /usr/local/kafka/kafka-eagle/kafka-eagle-bin-2.0.7/efak-web-2.0.7/bin
chmod +x ke.sh
./ke.sh start
启动kafka-eagle时的部分信息,注意,这里已经给出了默认的账号及密码信息。
[2021-09-16 23:14:02] INFO: Port Progress: [##################################################] | 100%
    [2021-09-16 23:14:06] INFO: Config Progress: [##################################################] | 100%
    [2021-09-16 23:14:09] INFO: Startup Progress: [##################################################] | 100%
    [2021-09-16 23:13:59] INFO: Status Code[0]
    [2021-09-16 23:13:59] INFO: [Job done!]
    Welcome to
        ______    ______    ___     __ __
    / ____/   / ____/   /   |   / //_/
    / __/     / /_      / /| |  / ,<
    / /___    / __/     / ___ | / /| |
    /_____/   /_/       /_/  |_|/_/ |_|
    ( Eagle For Apache Kafka® )

    Version 2.0.7 -- Copyright 2016-2021
    *******************************************************************
    * EFAK Service has started success.
    * Welcome, Now you can visit 'http://192.168.11.163:8048'
    * Account:admin ,Password:123456
    *******************************************************************
    *  ke.sh [start|status|stop|restart|stats] 
    *  https://www.kafka-eagle.org/ 
    *******************************************************************

kafka-eagle UI界面
  • 说明:三台机器kafka-eagle只配置一台即可。 2.5、接下来就可以配置Kafka Connect了。

    首选我们从 debezium 官网  https://debezium.io/releases/1.8/ 下载:debezium-connector-mysql-1.6.2.Final-plugin.tar.gz

    记得下载完后放到/usr/local/kafka/kafka_2.12-2.6.0/plugins目录。

    KafkaCnnect有两个核心概念还是要了解一下:Source和Sink。 Source负责导入数据到Kafka,Sink负责从Kafka导出数据,它们都被称为Connector。依然有点懵?其他文章可以深入的学习一下。个人认为在后续的开发中还是很用的。

    Kafka Connect的启动方式分为单机模式和集群模式,这里我们只关心集群模式。

    在distributed模式下,启动不需要传递connector的参数,而是通过REST API来对kafka connect进行管理,包括启动、暂停、重启、恢复和查看状态的 *** 作。

    必须知道的几个个topic

    1-group.id (topic name默认 connect-cluster):Connect cluster group 使用唯一的名称。

     2-config.storage.topic (topic name默认 connect-configs):topic 用于存储 Connector 和任务配置。

    3-offset.storage.topic (topic name默认 connect-offsets) :topic 用于存储 offsets。

    4-status.storage.topic (默认 connect-status):topic 用于存储状态。

    分布式配置文件还是要配一下:找到kafka安装目录:/usr/local/kafka/kafka_2.12-2.6.0/config

    # vi connect-distributed.properties
    
    bootstrap.servers=hadoopMaster:9092,hadoopSlave0:9092,hadoopSlave1:9092
    
    group.id=connect-cluster
    
    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    
    key.converter.schemas.enable=true
    value.converter.schemas.enable=true
    
    offset.storage.topic=connect-offsets
    offset.storage.replication.factor=1
    #offset.storage.partitions=25
    
    config.storage.topic=connect-configs
    config.storage.replication.factor=1
    
    status.storage.topic=connect-status
    status.storage.replication.factor=1
    #status.storage.partitions=5
    
    offset.flush.interval.ms=10000
    
    #rest.host.name=
    rest.port=8083
    
    rest.advertised.host.name=hadoopMaster
    rest.advertised.port=8083
    
    # Debezium 配置
    plugin.path=/usr/local/kafka/kafka_2.12-2.6.0/plugins

    启动 Kafka Connect (每个节点都要启动:这里是以分布式方式启动的,有别于本地模式哈)

    # cd /usr/local/kafka/kafka_2.12-2.6.0/bin
    
    ./connect-distributed.sh -daemon ../config/connect-distributed.properties

    至此CDC的部署全部完成。那怎么用呢?

    3、CDC使用 3.1、Kafka Connect & Debezium 使用入门

    先加强一下印象:

    1-kafka connect 是一个可扩展的、可靠的在kafka和其他系统之间流传输的数据工具。简而言之就是他可以通过Connector(连接器)简单、快速的将大集合数据导入和导出kafka。可以接收整个数据库或收集来自所有的应用程序的消息到kafka的topic中。

    2-在任何一个机器上通过curl的方式请求确认服务是否正常:curl localhost:8083/connectors,如果办输出一个空的[],说明一切都OK了;

    3-集群模式需要通过 Kafka Connect REST API 去提交connectors,具体的API不一一列出来了;

    我们提交一个connectors

    {
    	"name": "test-connector",
    	"config": {
    		"connector.class": "io.debezium.connector.mysql.MySqlConnector",
    		"tasks.max": "1",
    		"database.hostname": "192.168.1.1",
    		"database.port": "3306",
    		"database.user": "root",
    		"database.password": "......",
    		"database.server.id": "202201",
    		"database.server.name": "testdbserver",
    		"database.include.list": "cdc_test_db",
    		"database.history.kafka.bootstrap.servers": "hadoopMaster:9092,hadoopSlave0:9092,hadoopSlave1:9092",
    		"database.history.kafka.topic": "db_history_test",
    		"include.schema.changes": "true",
    		"table.include.list": "t_cdc_test",
    		"database.history.skip.unparseable.ddl": "true"
    	}
    }

    在三台机器的任意一台执行:curl -X POST -H "Content-Type: application/json" localhost:8083/connectors/ -d ‘上面的那小段JSON’ 或都使用Postman提交到“192.168.1.3:8083/connectors”是一样的效果。

    提交成功后再执行一下:curl localhost:8083/connectors,就可以看到是这样了:[test-connector]

    参数基本上都能看的懂,Debezium官网上有非常详细的说明,请果需要,请移步至Debezium官网查看更多的配置。

    如果我们需要同步更多的库、表数据,提交多个connectors就行了,这里提醒一下:默认情况下Debezium是一个表自动生成一个topic,如果topic太多了,建议多张表对应一个topic,具体 *** 作不一一介绍。

    最后:深入学习的话至少要做两件事:Kafka Connect REST API、Debezium官网至少过一遍;最后,你可能还需要抽2分钟上想想Kafka Connect与Debezium的关系,如果有精力,看看Debezium的源码那就更牛了。

    3.2、Kafka Connect & Debezium & SparkStreaming整合到一起

    说到Kafka Connect & Debezium & SparkStreaming整合前,我们看看 Debezium 通过Kafka发送的数据长什么样,如下:

    {
    	"schema": {
    		"name": "testdbserver.cdc_test_db.t_cdc_test.Envelope",
    		"optional": false,
    		"type": "struct",
    		"fields": [{
    			"field": "before",
    			"name": "testdbserver.cdc_test_db.t_cdc_test.Value",
    			"optional": true,
    			"type": "struct",
    			"fields": [{
    				"field": "Id",
    				"optional": false,
    				"type": "int64"
    			}, {
    				"field": "Name",
    				"optional": false,
    				"type": "string"
    			}]
    		}, {
    			"field": "after",
    			"name": "testdbserver.cdc_test_db.t_cdc_test.Value",
    			"optional": true,
    			"type": "struct",
    			"fields": [{
    				"field": "Id",
    				"optional": false,
    				"type": "int64"
    			}, {
    				"field": "Name",
    				"optional": false,
    				"type": "string"
    			}]
    		}, {
    			"field": "source",
    			"name": "io.debezium.connector.mysql.Source",
    			"optional": false,
    			"type": "struct",
    			"fields": [{
    				"field": "version",
    				"optional": false,
    				"type": "string"
    			}, {
    				"field": "connector",
    				"optional": false,
    				"type": "string"
    			}, {
    				"field": "name",
    				"optional": false,
    				"type": "string"
    			}, {
    				"field": "ts_ms",
    				"optional": false,
    				"type": "int64"
    			}, {
    				"default": "false",
    				"field": "snapshot",
    				"name": "io.debezium.data.Enum",
    				"optional": true,
    				"type": "string",
    				"version": 1,
    				"parameters": {
    					"allowed": "true,last,false"
    				}
    			}, {
    				"field": "db",
    				"optional": false,
    				"type": "string"
    			}, {
    				"field": "sequence",
    				"optional": true,
    				"type": "string"
    			}, {
    				"field": "table",
    				"optional": true,
    				"type": "string"
    			}, {
    				"field": "server_id",
    				"optional": false,
    				"type": "int64"
    			}, {
    				"field": "gtid",
    				"optional": true,
    				"type": "string"
    			}, {
    				"field": "file",
    				"optional": false,
    				"type": "string"
    			}, {
    				"field": "pos",
    				"optional": false,
    				"type": "int64"
    			}, {
    				"field": "row",
    				"optional": false,
    				"type": "int32"
    			}, {
    				"field": "thread",
    				"optional": true,
    				"type": "int64"
    			}, {
    				"field": "query",
    				"optional": true,
    				"type": "string"
    			}]
    		}, {
    			"field": "op",
    			"optional": false,
    			"type": "string"
    		}, {
    			"field": "ts_ms",
    			"optional": true,
    			"type": "int64"
    		}, {
    			"field": "transaction",
    			"optional": true,
    			"type": "struct",
    			"fields": [{
    				"field": "id",
    				"optional": false,
    				"type": "string"
    			}, {
    				"field": "total_order",
    				"optional": false,
    				"type": "int64"
    			}, {
    				"field": "data_collection_order",
    				"optional": false,
    				"type": "int64"
    			}]
    		}]
    	},
    	"payload": {
    		"op": "c", //c代表创建(或插入),u代表更新,d代表删除,r代表读(在非初始快照的情况下)
    		"after": {
    			"Id": 1,
    			"Name": "1"
    		},
    		"source": {
    			"server_id": 1,
    			"version": "1.6.2.Final",
    			"file": "mysql-bin.000083",
    			"connector": "mysql",
    			"pos": 1004002502,
    			"name": "testdbserver",
    			"row": 0,
    			"ts_ms": 1641377065000,
    			"snapshot": "false",
    			"db": "cdc_test_db",
    			"table": "t_cdc_test"
    		}

    是不是非常的清析?哪个库、哪个表、哪个字段发生了什么样的变化、变化前是什么、变化后是什么等等一应具全。

    接下,我们聊聊 SparkStreaming 实时消费Kafka......

    欢迎分享,转载请注明来源:内存溢出

    原文地址: http://outofmemory.cn/zaji/5705472.html

  • (0)
    打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
    上一篇 2022-12-17
    下一篇 2022-12-17

    发表评论

    登录后才能评论

    评论列表(0条)

    保存