基于 MySQL Binlog 的 Elasticsearch 数据同步实践

基于 MySQL Binlog 的 Elasticsearch 数据同步实践,第1张

一、背景

随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。

使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。

二、现有方法及问题

对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张MySQL 表中,这张中间表对应了业务需要的 Elasticsearch 索引,每一列对应索引中的一个Mapping 字段。通过脚本以 Crontab 的方式,读取 MySQL 中间表中 UTime 大于上一次读取时间的所有数据,即该段时间内的增量,写入Elasticsearch。

所以,一旦业务逻辑中有相应字段的数据变更,需要同时顾及 MySQL 中间表的变更;如果需要 Elasticsearch 中的数据即时性较高,还需要同时写入 Elasticsearch。

随着业务数据越来越多,MySQL 中间表的数据量越来越大。当需要在 Elasticsearch 的索引中新增 Mapping 字段时,相应的 MySQL 中间表也需要新增列,在数据量庞大的表中,扩展列的耗时是难以忍受的。

而且 Elasticsearch 索引中的 Mapping 字段随着业务发展增多,需要由业务方增加相应的写入 MySQL 中间表方法,这也带来一部分开发成本。

三、方案设计

1. 整体思路

现有的一些开源数据同步工具,如阿里的 DataX 等,主要是基于查询来获取数据源,这会存在如何确定增量(比如使用utime字段解决等)和轮询频率的问题,而我们一些业务场景对于数据同步的实时性要求比较高。为了解决上述问题,我们提出了一种基于 MySQL Binlog 来进行 MySQL 数据同步到 Elasticsearch 的思路。Binlog 是 MySQL 通过 Replication 协议用来做主从数据同步的数据,所以它有我们需要写入 Elasticsearch 的数据,并符合对数据同步时效性的要求。

使用 Binlog 数据同步 Elasticsearch,业务方就可以专注于业务逻辑对 MySQL 的 *** 作,不用再关心数据向 Elasticsearch 同步的问题,减少了不必要的同步代码,避免了扩展中间表列的长耗时问题。

经过调研后,我们采用开源项目 go-mysql-elasticsearch 实现数据同步,并针对马蜂窝技术栈和实际的业务环境进行了一些定制化开发。

2. 数据同步正确性保证

公司的所有表的 Binlog 数据属于机密数据,不能直接获取,为了满足各业务线的使用需求,采用接入 Kafka 的形式提供给使用方,并且需要使用方申请相应的 Binlog 数据使用权限。获取使用权限后,使用方以 Consumer Group 的形式读取。

这种方式保证了 Binglog 数据的安全性,但是对保证数据同步的正确性带来了挑战。因此我们设计了一些机制,来保证数据源的获取有序、完整。

1). 顺序性

通过 Kafka 获取 Binlog 数据,首先需要保证获取数据的顺序性。严格说,Kafka 是无法保证全局消息有序的,只能局部有序,所以无法保证所有 Binlog 数据都可以有序到达 Consumer。

但是每个 Partition 上的数据是有序的。为了可以按顺序拿到每一行 MySQL 记录的 Binglog,我们把每条 Binlog 按照其 Primary Key,Hash 到各个 Partition 上,保证同一条 MySQL 记录的所有 Binlog 数据都发送到同一个 Partition。

如果是多 Consumer 的情况,一个 Partition 只会分配给一个 Consumer,同样可以保证 Partition 内的数据可以有序的 Update 到 Elasticsearch 中。

2). 完整性

考虑到同步程序可能面临各种正常或异常的退出,以及 Consumer 数量变化时的 Rebalance,我们需要保证在任何情况下不能丢失 Binlog 数据。

利用 Kafka 的 Offset 机制,在确认一条 Message 数据成功写入 Elasticsearch 后,才 Commit 该条 Message 的 Offset,这样就保证了数据的完整性。而对于数据同步的使用场景,在保证了数据顺序性和完整性的情况下,重复消费是不会有影响的。

四、技术实现

1. 功能模块

配置解析模块

负责解析配置文件(toml 或 json 格式),或在配置中心(Skipper)配置的 json 字符串。包括 Kafka 集群配置、Elasticsearch 地址配置、日志记录方式配置、MySQL 库表及字段与 Elasticsearch 的 Index 和 Mapping 对应关系配置等。

规则模块

规则模块决定了一条 Binlog 数据应该写入到哪个 Elasticsearch 索引、文档_id 对应的 MySQL 字段、Binlog 中的各个 MySQL 字段与索引 Mapping 的对应关系和写入类型等。

在本地化过程中,根据我们的业务场景,增加了对 MySQL 表各字段的 where 条件判断,来过滤掉不需要的 Binlog 数据。

Kafka 相关模块

该模块负责连接 Kafka 集群,获取 Binlog 数据。

Binlog 数据解析模块

原项目中的 Binlog 数据解析针对的是原始的 Binlog 数据,包含了解析 Replication 协议的实现。在我们的使用场景中,Binlog 数据已经是由 canal 解析成的 json 字符串,所以对该模块的功能进行了简化。

binlog json字符串示例

上面是一个简化的 binlog json 字符串,通过该条 binlog 的 database 和 table 可以命中一条配置规则,根据该配置规则,把 Data 中的 key-value 构造成一个与对应 Elasticsearch 索引相匹配的 key-value map,同时包括一些数据类型的转换:

Elasticsearch相关模块

Binlog 数据解析模块生成的 key-value map,由该模块拼装成请求_bulk 接口的 update payload,写入 Elasticsearch。考虑到 MySQL 频繁更新时对 Elasticsearch 的写入压力,key-value map 会暂存到一个 slice 中,每 200ms 或 slice 长度达到一定长度时(可以通过配置调整),才会调用 Elasticsearch 的_bulk 接口,写入数据。

2. 定制化开发

1). 适应业务需求

upsert

业务中使用的索引数据可能是来自多个不同的表,同一个文档的数据来自不同表的时候,先到的数据是一条 index,后到的数据是一条 update,在我们无法控制先后顺序时,需要实现 upsert 功能。在_bulk 参数中加入

Filter

实际业务场景中,可能业务需要的数据只是某张表中的部分数据,比如用 type 字段标识该条数据来源,只需要把 type=1或2的数据同步到 Elasticsearch 中。我们扩展了规则配置,可以支持对 Binlog 指定字段的过滤需求,类似:

2)快速增量

数据同步一般分为全量和增量。接入一个业务时,首先需要把业务现有的 历史 MySQL 数据导入到 Elasticsearch 中,这部分为全量同步。在全量同步过程中以及后续增加的数据为增量数据。

在全量数据同步完成后,如果从最旧开始消费 Kafka,队列数据量很大的情况下,需要很长时间增量数据才能追上当前进度。为了更快的拿到所需的增量 Binlog,在 Consumer Group 消费 Kafka 之前,先获取各个 Topic 的 Partition 在指定时间的 offset 值,并 commit 这些 offset,这样在 Consumer Group 连接 Kafka 集群时,会从刚才提交的 offset 开始消费,可以立即拿到所需的增量 Binlog。

3). 微服务和配置中心

项目使用马蜂窝微服务部署,为新接入业务提供了快速上线支持,并且在业务 Binlog 数据突增时可以方便快速的扩容 Consumer。

马蜂窝配置中心支持了各个接入业务的配置管理,相比于开源项目中的 toml 格式配置文件,使用配置中心可以更方便的管理不同业务不同环境的配置。

五、日志与监控

从上图中可以看出,订单各个表的数据同步延时平均在 1s 左右。把延时数据接入 ElastAlert,在延时数据过多时发送报警通知。

另一个监控指标是心跳检测,单独建立一张独立于业务的表,crontab 脚本每分钟修改一次该表,同时检查上一次修改是否同步到了指定的索引,如果没有,则发送报警通知。该心跳检测,监控了整个流程上的 Kafka、微服务和 ES,任何一个会导致数据不同步的环节出问题,都会第一个接到通知。

六、结语

目前接入的最重要业务方是电商的订单索引,数据同步延时稳定在 1s 左右。这次的开源项目本地化实践,希望能为一些有 Elasticsearch 数据同步需求的业务场景提供帮助。

mysql双机热备实现原理分析,在本文经过深思熟虑和多次用不同的方式实测试后。最后在这篇文章中,用一个小例子来完成mysql双机热备的实现。

Mysql数据库没有增量备份的机制,当数据量太大的时候备份是一个很大的问题。还好mysql数据库提供了一种主从备份的机制,其实就是把主数据库的所有的数据同时写到备份的数据库中。实现mysql数据库的热备份。

要想实现双机的热备,首先要了解主从数据库服务器的版本的需求。要实现热备mysql的版本都高于3.2。还有一个基本的原则就是作为从数据库的数据版本可以高于主服务器数据库的版本,但是不可以低于主服务器的数据库版本。

当然要实现mysql双机热备,除了mysql本身自带的REPLICATION功能可以实现外,也可以用Heartbeat这个开源软件来实现。不过本文主要还是讲如何用mysql自带的REPLICATION来实现mysql双机热备的功能。

1. 准备服务器

由于Mysql不同版本之间的(二进制日志)binlog格式可能会不太一样,因此最好的搭配组合是主(Master)服务器的Mysql版本和从(Slave)服务器版本相同或者更低,主服务器的版本肯定不能高于从服务器版本。

本次我用于测试的两台服务器版本都是Mysql-5.5.17。

2. Mysql 建立主-从服务器双机热备配置步骤

2.1环境描述

A服务器(主服务器Master):59.151.15.36

B服务器(从服务器Slave):218.206.70.146

主从服务器的Mysql版本皆为5.5.17

Linux环境下

将主服务器需要同步的数据库内容进行备份一份,上传到从服务器上,保证始初时两服务器中数据库内容一致。

不过这里说明下,由于我是利用Mysql在安装后就有的数据库test进行测试的,所以两台服务器里面是没有建立表的,只不分别在test里面建立了同样的一张空表tb_mobile

Sql语句如下:

mysql>create table tb_mobile( mobile VARCHAR(20) comment'手机号码', time timestamp DEFAULT now() comment'时间' )

2.2 主服务器Master配置

2.2.1 创建同步用户

进入mysql *** 作界面,在主服务器上为从服务器建立一个连接帐户,该帐户必须授予REPLICATION SLAVE权限。因为从mysql版本3.2以后就可以通过REPLICATION对其进行双机热备的功能 *** 作。

*** 作指令如下:

mysql>grant replication slave on *.* to 'replicate'@'218.206.70.146' identified by '123456'

mysql>flush privileges

创建好同步连接帐户后,我们可以通过在从服务器(Slave)上用replicat帐户对主服务器(Master)数据库进行访问下,看下是否能连接成功。

在从服务器(Slave)上输入如下指令:

[root@YD146 ~]# mysql -h59.151.15.36 -ureplicate -p123456

如果出现下面的结果,则表示能登录成功,说明可以对这两台服务器进行双机热备进行 *** 作。

2.2.2 修改mysql配置文件

如果上面的准备工作做好,那边我们就可以进行对mysql配置文件进行修改了,首先找到mysql配置所有在目录,一般在安装好mysql服务后,都会将配置文件复制一一份出来放到/ect目录下面,并且配置文件命名为:my.cnf。即配置文件准确目录为/etc/my.cnf

(Linux下用rpm包安装的MySQL是不会安装/etc/my.cnf文件的,

至于为什么没有这个文件而MySQL却也能正常启动和作用,在点有两个说法,

第一种说法,my.cnf只是MySQL启动时的一个参数文件,可以没有它,这时MySQL会用内置的默认参数启动,

第二种说法,MySQL在启动时自动使用/usr/share/mysql目录下的my-medium.cnf文件,这种说法仅限于rpm包安装的MySQL,

解决方法,只需要复制一个/usr/share/mysql目录下的my-medium.cnf文件到/etc目录,并改名为my.cnf即可。)

找到配置文件my.cnf打开后,在[mysqld]下修改即可:

[mysqld]

server-id = 1

log-bin=mysql-bin//其中这两行是本来就有的,可以不用动,添加下面两行即可

binlog-do-db = test

binlog-ignore-db = mysql

2.2.3 重启mysql服务

修改完配置文件后,保存后,重启一下mysql服务,如果成功则没问题。

2.2.4 查看主服务器状态

进入mysql服务后,可通过指令查看Master状态,输入如下指令:

注意看里面的参数,特别前面两个File和Position,在从服务器(Slave)配置主从关系会有用到的。

注:这里使用了锁表,目的是为了产生环境中不让进新的数据,好让从服务器定位同步位置,初次同步完成后,记得解锁。

2.3 从服务器Slave配置

2.3.1修改配置文件

因为这里面是以主-从方式实现mysql双机热备的,所以在从服务器就不用在建立同步帐户了,直接打开配置文件my.cnf进行修改即可,道理还是同修改主服务器上的一样,只不过需要修改的参数不一样而已。如下:

[mysqld]

server-id = 2

log-bin=mysql-bin

replicate-do-db = test

replicate-ignore-db = mysql,information_schema,performance_schema

2.3.2重启mysql服务

修改完配置文件后,保存后,重启一下mysql服务,如果成功则没问题。

2.3.3用change mster 语句指定同步位置

这步是最关键的一步了,在进入mysql *** 作界面后,输入如下指令:

mysql>stop slave //先停步slave服务线程,这个是很重要的,如果不这样做会造成以下 *** 作不成功。

mysql>change master to

>master_host='59.151.15.36',master_user='replicate',master_password='123456',

>master_log_file=' mysql-bin.000016 ',master_log_pos=107

注:master_log_file, master_log_pos由主服务器(Master)查出的状态值中确定。也就是刚刚叫注意的。master_log_file对应File, master_log_pos对应Position。Mysql 5.x以上版本已经不支持在配置文件中指定主服务器相关选项。

遇到的问题,如果按上面步骤之后还出现如下情况:

则要重新设置slave。指令如下

mysql>stop slave

mysql>reset slave

之后停止slave线程重新开始。成功后,则可以开启slave线程了。

mysql>start slave

2.3.4查看从服务器(Slave)状态

用如下指令进行查看

mysql>show slave status\G

查看下面两项值均为Yes,即表示设置从服务器成功。

Slave_IO_Running: Yes

Slave_SQL_Running: Yes

2.4 测试同步

之前开始已经说过了在数据库test只有一个表tb_mobile没有数据,我们可以先查看下两服务器的数据库是否有数据:

Master:59.151.15.36

Slave:218.206.70.146

好了,现在可以在Master服务器中插入数据看下是否能同步。

Master:59.151.15.36

Slave:218.206.70.146

可以从上面两个截图上看出,在Master服务器上进行插入的数据在Slave服务器可以查到,这就表示双机热备配置成功了。

3. Mysql 建立主-主服务器双机热备配置步骤

服务器还是用回现在这两台服务器

3.1创建同步用户

同时在主从服务器建立一个连接帐户,该帐户必须授予REPLIATION SLAVE权限。这里因为服务器A和服务器B互为主从,所以都要分别建立一个同步用户。

服务器A:

mysql>grant replication slave on *.* to 'replicate'@'218.206.70.146' identified by '123456'

mysql>flush privileges

服务器B:

mysql>grant replication slave on *.* to 'replicate'@'59.151.15.36' identified by '123456'

mysql>flush privileges

3.2修改配置文件my.cnf

服务器A

[mysqld]

server-id = 1

log-bin=mysql-bin

binlog-do-db = test

binlog-ignore-db = mysql

#主-主形式需要多添加的部分

log-slave-updates

sync_binlog = 1

auto_increment_offset = 1

auto_increment_increment = 2

replicate-do-db = test

replicate-ignore-db = mysql,information_schema

服务器B:

[mysqld]

server-id = 2

log-bin=mysql-bin

master-slave need

replicate-do-db = test

replicate-ignore-db = mysql,information_schema,performance_schema

#主-主形式需要多添加的部分

binlog-do-db = test

binlog-ignore-db = mysql

log-slave-updates

sync_binlog = 1

auto_increment_offset = 2

auto_increment_increment = 2

3.3分别重启A服务器和B服务器上的mysql服务

重启服务器方式和上面的一样,这里就不做讲解了。

3.4分别查A服务器和B服务器作为主服务器的状态

服务器A:

服务器B:

3.5分别在A服务器和B服务器上用change master to 指定同步位置

服务器A:

mysql>change master to

>master_host='218.206.70.146',master_user='replicate',master_password='123456',

>master_log_file=' mysql-bin.000011 ',master_log_pos=497

服务器B:

mysql>change master to

>master_host='59.151.15.36',master_user='replicate',master_password='123456',

>master_log_file=' mysql-bin.000016 ',master_log_pos=107

3.6 分别在A和B服务器上重启从服务线程

mysql>start slave

3.7 分别在A和B服务器上查看从服务器状态

mysql>show slave status\G

查看下面两项值均为Yes,即表示设置从服务器成功。

Slave_IO_Running: Yes

Slave_SQL_Running: Yes

3.8 测试主-主同步例子

测试服务器A:

在服务器A上插入一条语句如下图所示:

之后在服务器B上查看是否同步如下图所示:

测试服务器B:

在服务器B上插入一条语句如下图所示:

然后在从服务器A上查看是否有同步数据如下图所示:

最后从结果可以看出主-主形式的双机热备是能成功实现的。

4. 配置参数说明

Server-id

ID值唯一的标识了复制群集中的主从服务器,因此它们必须各不相同。Master_id必须为1到232-1之间的一个正整数值,slave_id值必须为2到232-1之间的一个正整数值。

Log-bin

表示打开binlog,打开该选项才可以通过I/O写到Slave的relay-log,也是可以进行replication的前提。

Binlog-do-db

表示需要记录二进制日志的数据库。如果有多个数据可以用逗号分隔,或者使用多个binlog-do-dg选项。

Binglog-ingore-db

表示不需要记录二进制日志的数据库,如果有多个数据库可用逗号分隔,或者使用多binglog-ignore-db选项。

Replicate-do-db

表示需要同步的数据库,如果有多个数据可用逗号分隔,或者使用多个replicate-do-db选项。

Replicate-ignore-db

表示不需要同步的数据库,如果有多个数据库可用逗号分隔,或者使用多个replicate-ignore-db选项。

Master-connect-retry

master-connect-retry=n表示从服务器与主服务器的连接没有成功,则等待n秒(s)后再进行管理方式(默认设置是60s)。如果从服务器存在mater.info文件,它将忽略些选项。

Log-slave-updates

配置从库上的更新 *** 作是否写入二进制文件,如果这台从库,还要做其他从库的主库,那么就需要打这个参数,以便从库的从库能够进行日志同步。

Slave-skip-errors

在复制过程,由于各种原因导致binglo中的sql出错,默认情况下,从库会停止复制,要用户介入。可以设置slave-skip-errors来定义错误号,如果复制过程中遇到的错误是定义的错误号,便可以路过。如果从库是用来做备份,设置这个参数会存在数据不一致,不要使用。如果是分担主库的查询压力,可以考虑。

Sync_binlog=1 Or N

Sync_binlog的默认值是0,这种模式下,MySQL不会同步到磁盘中去。这样的话,Mysql依赖 *** 作系统来刷新二进制日志binary log,就像 *** 作系统刷新其他文件的机制一样。因此如果 *** 作系统或机器(不仅仅是Mysql服务器)崩溃,有可能binlog中最后的语句丢失了。要想防止这种情况,可以使用sync_binlog全局变量,使binlog在每N次binlog写入后与硬盘同步。当sync_binlog变量设置为1是最安全的,因为在crash崩溃的情况下,你的二进制日志binary log只有可能丢失最多一个语句或者一个事务。但是,这也是最慢的一种方式(除非磁盘有使用带蓄电池后备电源的缓存cache,使得同步到磁盘的 *** 作非常快)。

即使sync_binlog设置为1,出现崩溃时,也有可能表内容和binlog内容之间存在不一致性。如果使用InnoDB表,Mysql服务器处理COMMIT语句,它将整个事务写入binlog并将事务提交到InnoDB中。如果在两次 *** 作之间出现崩溃,重启时,事务被InnoDB回滚,但仍然存在binlog中。可以用-innodb-safe-binlog选项来增加InnoDB表内容和binlog之间的一致性。(注释:在Mysql 5.1版本中不需要-innodb-safe-binlog;由于引入了XA事务支持,该选项作废了),该选项可以提供更大程度的安全,使每个事务的binlog(sync_binlog=1)和(默认情况为真)InnoDB日志与硬盘同步,该选项的效果是崩溃后重启时,在滚回事务后,Mysql服务器从binlog剪切回滚的InnoDB事务。这样可以确保binlog反馈InnoDB表的确切数据等,并使从服务器保持与主服务器保持同步(不接收回滚的语句)。

Auto_increment_offset和Auto_increment_increment

Auto_increment_increment和auto_increment_offset用于主-主服务器(master-to-master)复制,并可以用来控制AUTO_INCREMENT列的 *** 作。两个变量均可以设置为全局或局部变量,并且假定每个值都可以为1到65,535之间的整数值。将其中一个变量设置为0会使该变量为1。

这两个变量影响AUTO_INCREMENT列的方式:auto_increment_increment控制列中的值的增量值,auto_increment_offset确定AUTO_INCREMENT列值的起点。

如果auto_increment_offset的值大于auto_increment_increment的值,则auto_increment_offset的值被忽略。例如:表内已有一些数据,就会用现在已有的最大自增值做为初始值。

主库上记录二进制日志,也就是binlog日志。

备库将主库的二进制日志复制到其本地的中继日志中。首先,备库会启动一个工作线程,称为I/O线程,I/O线程跟主库建立一个普通的客户端连接,然后在主库上启动一个特殊的二进制转存(Binglog Dump)线程,这个转存线程会读取主库上的二进制日志中事件,并发送给从库的I/O线程;如果主库没有更新信息将进入休眠。

备库的SQL线程执行最后一步,该线程从中继日志中读取事件并在备库执行,从而实现备库数据的更新。


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/sjk/6753907.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-03-27
下一篇 2023-03-27

发表评论

登录后才能评论

评论列表(0条)

保存