canal原理是把自己伪装成一个mysql的从节点来读取mysql主库的binlog日志。
所以需要mysql主库先开启binlog日志功能。可以参考其他帖子打开binlog功能。
ps!!!!! 这里有一个非常值得注意的问题就是canal采集到MQ数据中使用的是binlog的的row模式
一定要是row模式。并且canal中配置canal.instance.filter.regex 如果配制指定采集某几个表一定要在mysql中配置binlog_rows_query_log_events是OFF模式的。否则canal中的canal.instance.filter.regex过滤器不生效。
第二步:
canal服务解压之后
其中canal_local.properties是canal控制台配置文件
canal.properties 是canal基础服务配置文件
ht_order_sync文件夹 是canal的服务实例
ht_product_sync文件夹 是canal的服务实例
上面这些是比较关键的文件
其中ht_order_sync 和 ht_product_sync 是我自己创建的文件名字可以随便叫什么都可以
如果是全新的canal解压之后 有一个example文件那个就是样例文件。
我创建了 ht_order_sync 和 ht_product_sync 是因为我有两个业务需求是 同步订单业务 和 商品业务 所以创建了两个实例,canal启动之后会加载自己创建的文件夹。
cd 进入 ht_order_sync文件夹后 如下图
我们一般需要改的只有 instance.properties 这个文件。其他文件是记录binlog的同步位置的文件。删除之后就重置binlog的采集位置,所以不要轻易删除。
下面打开instance.properties 如下图
图中
1 是要采集的mysql的账号和密码
2 是要采集的哪张表可以配置全部也可以配置部分我是配置了部分表可以直接写库和表名,我库名用了变量后面讲怎么传进来的
3 是黑明单结合上面那个白名单用的 我固定了采集某几张表所以不要配置
4 是采集的每一行的变动会发送到配置的mq的topic中作为mysql的一条改动数据(增删改)
mq数据如下图会包含改动前和改动后的数据表名库名等等。
上图是我做的一个insert的样例数据,type类型就是insert,还有update和delete 有字段类型描述
old 是改动前的数据 因为insert *** 作所以是null 如果是update *** 作此处会有值 可以做变更监听逻辑
data是当前改动后的数据。
上面说的都是具体实例配置
下面贴出canal的实例上层配置文件也就是canal服务配置文件 canal.properties
如下图. 一张图截图放不下放了三张图
图上编号
1 是代表canal采集到信息推送到哪里。tcp是代表可以推送到程序采集模式
如果是mq配置成对应的mq比如 kafka活着rocketmq等等
2 是canal.destinations 扫描上面说的自己创建的实例 配置几个文件夹扫描几个
canal.auto.scan = true 的意思是自动扫描自己创建的实例。
所以应该可以把canal.auto.scan 配置成false然后配置canal.destinations自己创建的文件夹
即可
3 canal.mq.flatMessage = true
关注一下
采集的消息的消息方式之前好像不设置成true没采集到,已经忘了 也需要配置 mq读取 也使用这个方式。 可以参考一下官网等等。
4 配置成了rocketMQ那么就配置 相关的主题等等
结束
因为自己项目中需要用到mysql数据同步到es中,查找了相关资料最后决定用canal来做,所以便有了本文,下面一起来看如何使用canal吧
根据 https://github.com/alibaba/canal 上的原理解释,我们知道 canal 会模拟 mysql slave 的交互协议,伪装自己为 mysql slave,然后向 mysql master 发送 dump 协议。
mysql master 收到 dump 请求,开始推送 binary log 给 slave(也就是 canal),然后 canal 解析 binary log 对象(原始为 byte流)。
经 canal 解析过的对象,我们使用起来就非常的方便了。
再根据 https://github.com/alibaba/canal/releases 提供的版本信息,你会发现 canal 其实相当于一个中间件,专门用来解析 MySQL 的 binlog 日志。canal 解析好了之后,会封装成一个数据对象,通过 protobuf3.0 协议进行交互,让 canal 客户端进行消费。
根据上面的解释,以及 canal 提供的版本信息,我们在使用 canal 的时候,首选要安装一个 canal.deployer-1.1.4.tar.gz 进行解析 MySQL 的 binlog 日志。
下载后,复制 canal.deployer-1.1.4.tar.gz 到 MySQL 主机上,比如放在 /usr/local/soft/目录下。然后依次执行下面的命令:
然后修改 canal 的配置文件 vim conf/example/instance.properties
这三项改成你自己的,比如我的配置如下:
然后保存并退出。(VI 模式下,按 Esc 输入 :wq 回车退出。)
接着,我们检查一下 MySQL 的配置。确定版本和是否开启了 binlog 日志,以及日志格式。
canal 支持 binlog 格式为 ROW 的模式。如果你没开启 binlog,并且格式是非 row 的,建议修改一下 mysql 的配置文件。
执行 mysql –help | grep my.cnf 找到 mysql 的 my.cnf 文件。
执行 vim /etc/my.cnf 命令。添加下面 3 个配置。
然后保存并退出。
接着执行 sudo service mysqld restart 重启 MySQL。
需要注意的是你的 mysql 用户,必须要有 REPLICATION SLAVE 权限。该权限授予 slave 服务器以该账户连接 master 后可以执行 replicate *** 作的权利。
如果没有权限,则使用 root 账户登录进 MySQL,执行下面的语句,创建用户,分配权限。
MySQL 启动后,就可以开启 canal 服务了。
开启后,观察 canal 服务的日志,确保服务正常。
查看 canal 的日志
确定没有问题后,开始编写我们的测试程序。
pom.xml 中导入下面的依赖。
使用JAVA进行测试
然后执行 main 方法。你再修改修改 MySQL 中的数据,你会发现所有改变都同步过来了。上面是使用的Java代码进行运行,如果想用canal.adapter来进行运行可以下载
放入服务器中,依次执行下面命令
然后修改配置文件 :
然后将需要运行存储到es的的yml文件放入到
目录下。例如:
然后开启canal-adapter服务
/usr/local/soft/canal-adapter/bin/startup.sh
查看 canal-adapter 的日志,确定没有问题后修改数据 就可以同步到es了
注意:
1、canal-adapter自带mysql连接使用的5.x的,如果自己安装的是高版本的mysql需要自己去/usr/local/soft/canal-adapter/lib增加对应的jar包
2、因项目中同步es使用的sql中有数据库中没有的字段,导致原生程序一直报异常,后修改源码中
加了一个判断后才可以
3、es中使用的date字段类型和数据库中不一致,所以这里又修改了部分源码兼容我们项目中的类型
可以根据各自情况修改。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)