Canal(Mysql)数据同步工具使用

Canal(Mysql)数据同步工具使用,第1张

Canal(Mysql)数据同步工具使用 1. Canal介绍

阿里巴巴 B2B 公司,因为业务的特性,卖家主要集中在国内,买家主要集中在国外,所以衍生出了同步杭州和美国异地机房的需求,从 2010 年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务。Canal 是用 Java 开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。目前。Canal 主要支持了 MySQL 的 Binlog 解析,解析完成后才利用 Canal Client 来处理获得的相关数据。(数据库同步需要阿里的 Otter 中间件,基于 Canal)

2.Canal工作原理 2.1 Mysql主从复制原理

1)Master 主库将改变记录,写到二进制日志(Binary Log)中;
2)Slave 从库向 MySQL Master 发送 dump 协议,将 Master 主库的 binary log events 拷贝
到它的中继日志(relay log);
3)Slave 从库读取并重做中继日志中的事件,将改变的数据同步到自己的数据库。

2.2 Mysql bin-log开启

详情见Maxwell简单使用2.2.1章节

3 Canal使用 3.1 Canal部署

Canal需要部署到linux服务器上

  1. 上传canal.deployer-1.1.2.tar.gz包到服务器的/opt/software下
  2. 解压 tar -zxvf canal.deployer-1.1.2.tar.gz -C ./canal/ 目录下
  3. 配置canal.properties和instance.properties文件中内容
    cancal.properties

instance.properties

4. 启动canal ./bin/startup.sh

3.2客户端连接Canal
  1. 创建一个java的maven工程
  2. pom.xml中添加如下配置
		
			com.alibaba.otter
			canal.client
			1.1.2
		
  1. 创建一个CanalClient.java文件
  2. 内容如下
  //连接服务端canal
        CanalConnector example = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.10.130", 11111), "example", "", "");
        //开始循环订阅数据
        while (true) {
            //开始连接
            example.connect();
            //订阅某个库或者表数据
            example.subscribe("test-canal.*");
            //获取变化数据
            Message message = example.get(100);
            //拿到数据
            List entries = message.getEntries();
            //判断是否存在数据
            if (entries.size() <= 0) {
                log.info("没有数据");
                Thread.sleep(1000L);
            } else {
            //开始遍历数据
                for (CanalEntry.Entry e : entries) {
                    String tableName = e.getHeader().getTableName();
                    CanalEntry.EntryType entryType = e.getEntryType();
                    if (CanalEntry.EntryType.ROWDATA.equals(entryType)) {
                        ByteString storevalue = e.getStorevalue();
                        CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storevalue);
                        List rowDatasList = rowChange.getRowDatasList();
                        for (CanalEntry.RowData row : rowDatasList) {
                            Map beforeMap = new HashMap<>(1 << 4);
                            List beforeColumnsList = row.getBeforeColumnsList();
                            for (CanalEntry.Column c : beforeColumnsList) {
                                beforeMap.put(c.getName(), c.getValue());
                            }
                            Map afterMap = new HashMap<>(1 << 4);
                            List afterColumnsList = row.getAfterColumnsList();
                            for (CanalEntry.Column c : afterColumnsList) {
                                afterMap.put(c.getName(), c.getValue());
                            }
                            log.info("表名:{},之前数据:{},之后数据:{}", tableName, JSON.toJSON(beforeMap), JSON.toJSON(afterMap));
                        }
                    }
                }
            }
        }
  1. 数据变化格式
4 Canal使用总结

上面为一个canal的简单使用。具体详细使用可以查看Canal的gitHub项目地址

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5693340.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存