canal原理是把自己伪装成一个mysql的从节点来读取mysql主库的binlog日志。
所以需要mysql主库先开启binlog日志功能。可以参考其他帖子打开binlog功能。
ps!!!!! 这里有一个非常值得注意的问题就是canal采集到MQ数据中使用的是binlog的的row模式
一定要是row模式。并且canal中配置canal.instance.filter.regex 如果配制指定采集某几个表一定要在mysql中配置binlog_rows_query_log_events是OFF模式的。否则canal中的canal.instance.filter.regex过滤器不生效。
第二步:
canal服务解压之后
其中canal_local.properties是canal控制台配置文件
canal.properties 是canal基础服务配置文件
ht_order_sync文件夹 是canal的服务实例
ht_product_sync文件夹 是canal的服务实例
上面这些是比较关键的文件
其中ht_order_sync 和 ht_product_sync 是我自己创建的文件名字可以随便叫什么都可以
如果是全新的canal解压之后 有一个example文件那个就是样例文件。
我创建了 ht_order_sync 和 ht_product_sync 是因为我有两个业务需求是 同步订单业务 和 商品业务 所以创建了两个实例,canal启动之后会加载自己创建的文件夹。
cd 进入 ht_order_sync文件夹后 如下图
我们一般需要改的只有 instance.properties 这个文件。其他文件是记录binlog的同步位置的文件。删除之后就重置binlog的采集位置,所以不要轻易删除。
下面打开instance.properties 如下图
图中
1 是要采集的mysql的账号和密码
2 是要采集的哪张表可以配置全部也可以配置部分我是配置了部分表可以直接写库和表名,我库名用了变量后面讲怎么传进来的
3 是黑明单结合上面那个白名单用的 我固定了采集某几张表所以不要配置
4 是采集的每一行的变动会发送到配置的mq的topic中作为mysql的一条改动数据(增删改)
mq数据如下图会包含改动前和改动后的数据表名库名等等。
上图是我做的一个insert的样例数据,type类型就是insert,还有update和delete 有字段类型描述
old 是改动前的数据 因为insert *** 作所以是null 如果是update *** 作此处会有值 可以做变更监听逻辑
data是当前改动后的数据。
上面说的都是具体实例配置
下面贴出canal的实例上层配置文件也就是canal服务配置文件 canal.properties
如下图. 一张图截图放不下放了三张图
图上编号
1 是代表canal采集到信息推送到哪里。tcp是代表可以推送到程序采集模式
如果是mq配置成对应的mq比如 kafka活着rocketmq等等
2 是canal.destinations 扫描上面说的自己创建的实例 配置几个文件夹扫描几个
canal.auto.scan = true 的意思是自动扫描自己创建的实例。
所以应该可以把canal.auto.scan 配置成false然后配置canal.destinations自己创建的文件夹
即可
3 canal.mq.flatMessage = true
关注一下
采集的消息的消息方式之前好像不设置成true没采集到,已经忘了 也需要配置 mq读取 也使用这个方式。 可以参考一下官网等等。
4 配置成了rocketMQ那么就配置 相关的主题等等
结束
利用Oracle golden gate 分发数据库同步消息至ActiveMQ 引言Oracle golden gate是甲骨文推出的一款数据库同步软件,可以实现异构数据库之间的亚秒级同步,它不仅仅支持Oracle,还支持Mysql和一些业界常用数据库。Ogg可以自动抽取表级数据的动态变化,直接将同步信息作用于目标数据库。然而,在某一些场景之中,我们并不需要将同步信息直接作用于目标数据库,而是将其保存在队列中,然后在队列中取出这些信息,完成一些下游系统的业务需求,这样可以让增量同步更加灵活。本篇文章主要讲述如何,配置Ogg来抽取源表的同步信息生成trail文件(Ogg专属同步文件),并利用分发器读取trail文件,适配成可以解析的xml,将其作为消息发送给队列。本文将不再讲诉一下关于安装的步骤,直接上干货。
OGG数据同步的重要概念 oracle归档和日志
Oacle归档模式是ORACLE热备份的必要条件,特别是7X24生产的数据库。可以这么认为:归档日志是增量同步的数据源,因此必须开启。可以通过下面的的指令查看是否开了归档:
SQL>archive log list
Ogg抽取进程
开启了归档和日志,ogg就有数据源来获取同步信息。而具体的工作是由Ogg抽取进程
让属于同一个订单的消息进入一个MessageQueue
所以要解决这个消息的乱序问题,最根本的方法其实非常简单,就是得想办法让一个订单的消息进入到一个MessageQueue里去。
举个例子,比如对一个订单,先后执行了insert、update两条SQL语句,那么我们现在就必须要想办法让这个订单的2个消息都直接进入到Topic下的MessageQueue里去。
那么我们这个时候应该怎么做呢?完全可以根据订单id来进行判断,我们可以往MQ里发送消息的时候,根据订单id来判断一下,如果订单id相同,你必须保证他进入同一个MessageQueue。
我们这里可以采用取模的方法,比如有一个订单id是1000,可能有两个消息,对于这两个消息,我们必须要用订单id=1000对MessageQueue的数量进行取模,比如MessageQueue一共有15个,那么此时订单id=1000取模就是5。
通过这个方法,就可以让一个订单的消息都按照顺序进入到一个MessageQueue中去。
真的这么简单吗?获取消息的时候也得有序
我们来思考一下,真的就像上面说的那么简单,只要保证一个订单的消息都进入一个MessageQueue中就搞定了吗?
显然不是。我们必须保证推送MQ的时候,也必须是有序的。
Consumer有序处理一个订单的消息
接着我们可以想一下,一个Consumer可以处理多个MessageQueue的消息,但是一个MessageQueue只能交给一个Consumer来进行处理,所以一个订单的消息只会给一个Consumer来进行处理。
这就完了吗?万一消息处理失败了可以走重试队列吗?
在Consumer处理消息的时候,可能因为底层存储挂了导致消息处理失败,之前有说过,可以返回RECONSUME_LATER状态,然后broker会过会儿自动给我们重试。
但是这个方案绝对不能用在有序消息中,因为如果你的Consumer获取到订单的一个消息,结果处理失败了,此时返回了RECONSUME_LATER,那么这条消息会进入重试队列,过一会儿才会交给你重试。
但是此时broker会把下一条消息交给消费者来处理,万一处理成功了,就又会出现乱序问题。
对于有序消息的方案中,如果你遇到消息处理失败的场景,就必须返回SUSPEND_CURRENT_QUEUE_A_MOMENT这个状态,意思是先等一会儿,一会儿再继续处理这批消息,而不能把这批消息放入重试队列中去,然后直接处理下一批消息。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)