首先说下我们的业务:我们是做第三方支付的,收单那边有很多数据,技术选型上选择了灵活方便的mysql,通过强大的分表分库中间件,我们能够轻松构建百库百表的巨大mysql集群,并且实现了不错的TPS。但是运营的瓶颈就显现出来,我们需要通过各种各样的条件来查询我们的订单交易,通过我们搭建的分表分库的mysql集群很难满足要求。ElasticSearch凭借着不错的搜索性能,完美满足了我们的业务要求,那么如何将数据从mysql同步到es,就是我们今天要说的课题。
原理:
数据流方向:mysql->canal->kafka-elasticsearch
一、mysql如何配置:Mysql 的 binlog 日志作用是用来记录 mysql 内部增删等对 mysql 数据库有更新的内容的 记录(对数据库的改动),对数据库的查询 select 或 show 等不会被 binlog 日志记录;主要用于数据库的主从复制以及增量恢复。
1、修改mysql配置文件 /etc/my.cnf
log-bin = mysql-bin
binlog-format=ROW
server-id=1
2、创建一个用户,需要配置复制的权限。
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT,REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
简介:canal是阿里巴巴旗下的开源产品,主要原理是伪装成mysql的slave节点通过binlog的方式进行同步,不详细介绍了,具体参考:https://github.com/alibaba/canal/wiki
我下载了canal的1.1.5版本:
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.admin-1.1.5.tar.gz
tar -zxvf canal.deployer-1.1.5.tar.gz后目录如下:
进入conf目录,修改配置文件
修改canal.properties 配置文件选项:
canal.serverMode = kafka
kafka.bootstrap.servers = 127.0.0.1:9092
canal.destinations = example
canal.mq.database.hash = false 我们把数据库配置成不参与hash到分区的计算,这样可以保证不同库业务相关的一些数据发送到同一个分区里。
我目前采用的方式是利用kafka做中间件,然后在程序里处理kafka消息完成数据的转移,相比直接连接canal,代码更加简单,而且消息中间件可以起到削峰填谷的作用。
修改example文件夹下的配置文件,也可以建example的同级目录文件夹来配置同步信息
修改instance.properties 配置文件
canal.instance.master.address=139.186.32.209:3306
canal.instance.dbUsername=root
canal.instance.dbPassword=Root_123456
着重看下如下配置
# table regex-->我们需要监听的表
canal.instance.filter.regex=.*\..*
# table black regex-->我们需要排除的表
canal.instance.filter.black.regex=
我们需要监听的字段
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
我们需要过滤的字段
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
发送MQ配置
# mq config-->通用topic配置,如果动态的没有匹配到就会走这个topic
canal.mq.topic=example
# dynamic topic route by schema or table regex 动态topic配置,可以将不同库不同表的消息发送到不同topic上
canal.mq.dynamicTopic=mytest1.user,mytest2\..*,.*\..*
canal.mq.partition=0
# hash partition config
mq的分区数
#canal.mq.partitionsNum=3
按照哪个字段进行hash到具体分区
#canal.mq.partitionHash=test.table:id^name,.*\..*
动态mq分区数设定
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
三、kafka如何对接:先安装zookeeper
再安装kafka
这个默认大家都会啊,不做细节的讲解。
四、同步程序如何编写:接下来我们就在程序里引入kafka的消息进行增删改查,
说明一下:从canal来的消息格式,我们用如下类进行接收:
@Data @Builder @NoArgsConstructor @AllArgsConstructor public class CanalMessage { private String database; private String table; private Boolean isDdl; private Long id; private String type; private JSonObject mysqlType; private Listdata; private List old; private List pkNames; private Long es; private Long ts; private JSonObject sqlType; }
然后就是消息消费的代码:
package chen.huai.jie.springboot.data.sync.processor; import chen.huai.jie.springboot.data.sync.model.CanalMessage; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; import static chen.huai.jie.springboot.data.sync.global.enums.DmlConstants.*; @Slf4j @Service public class PayOmcSystemProcessor extends baseProcessor { @KafkaListener(groupId = "data_sync", topics = {"example"}) public void listen(ConsumerRecordrecord, Acknowledgment ack) { try { log.info("收到消息:{}", record.toString()); handleRecord(record); ack.acknowledge(); } catch (Exception e) { log.error("数据同步失败:" + record.toString(), e); ack.nack(1000); } } private void handleRecord(ConsumerRecord record) { CanalMessage canalMessage = JSON.parseObject(record.value(), CanalMessage.class); if (canalMessage == null || CollectionUtils.isEmpty(canalMessage.getData())) { log.info("canalMessage is null or data list is empty."); return; } String index = canalMessage.getDatabase() + "_" + canalMessage.getTable(); switch (canalMessage.getType()) { case INSERT: case UPDATE: // 增加、修改处理 for (JSonObject jsonObject : canalMessage.getData()) { boolean success = esClient.addOrUpdate(index, jsonObject.getString("id"), jsonObject.toJSonString()); log.info("写入数据:{} 到 ES[index={}] {}", jsonObject.toJSonString(), index, success ? "成功" : "失败"); } break; case DELETE: // 删除处理 for (JSonObject jsonObject : canalMessage.getData()) { boolean success = esClient.delete(index, jsonObject.getString("id")); log.info("删除数据:{} 从 ES[index={}] {}", jsonObject.toJSonString(), index, success ? "成功" : "失败"); } break; default: break; } } }
我这里的例子比较简单,实际上在开发过程中,这个处理要比这个复杂很多,而且对吞吐量要求也要高很多。
五、如何保证高可用:有些同学肯定在想,如果canal,kafka或者同步程序任何一个挂了,同步数据流不就断了吗,是的,这个我们需要做高可用设计,极端情况下,假设我们链路上所有的程序都是单节点,我们的数据都不会丢失。
经过测试,不管是停掉canal还是kafka,还是同步程序,数据同步会中断,但是将应用启动后,数据都会同步过来。
同时我们也可以考虑canal的高可用部署:
就是部署多台canal,用zookeeper做协调:
所有配置均保持一直,只有canal.instance.mysql.slaveId参数,每个程序都配置不同,这样任意一台canal挂掉都不会影响数据同步,但是遇到一个没有解决的问题,如下供大家讨论:
https://github.com/alibaba/canal/issues/2621
六:canal-admin的使用:主要是用来管理的:
直接参考https://github.com/alibaba/canal/wiki/Canal-Admin-Guide
七、如何处理mysql分表分库同步到es:我们的业务也是通过分表分库来存储数据的,所以在数据同步的时候,需要用到canal对分表分库的支持,具体说明如下:
1、修改canal.properties
添加配置:canal.instance.global.spring.xml = classpath:spring/group-instance.xml
开启group-instance.xml,注释其他的
2、修改实例配置地址
canal.instance.master1.address=121.36.145.49:3306 canal.instance.master2.address=139.196.31.219:3306
这里值列举2个,有多个就一直写下去,这里的master1/master2与group-instance.xml里对应
修改group-instance.xml,
目前只配置了2个数据源,如果有需要就按照eventParser1的样子加一个eventParser3,修改对应的变量,再在下面增加
修改完毕后重新启动,就可以在一个instance里监听两个数据源的数据变化,其他配置不变。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)