2.Canal工作原理 2.1 Mysql主从复制原理阿里巴巴 B2B 公司,因为业务的特性,卖家主要集中在国内,买家主要集中在国外,所以衍生出了同步杭州和美国异地机房的需求,从 2010 年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务。Canal 是用 Java 开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。目前。Canal 主要支持了 MySQL 的 Binlog 解析,解析完成后才利用 Canal Client 来处理获得的相关数据。(数据库同步需要阿里的 Otter 中间件,基于 Canal)
1)Master 主库将改变记录,写到二进制日志(Binary Log)中;
2)Slave 从库向 MySQL Master 发送 dump 协议,将 Master 主库的 binary log events 拷贝
到它的中继日志(relay log);
3)Slave 从库读取并重做中继日志中的事件,将改变的数据同步到自己的数据库。
略
详情见Maxwell简单使用2.2.1章节
3 Canal使用 3.1 Canal部署Canal需要部署到linux服务器上
- 上传canal.deployer-1.1.2.tar.gz包到服务器的/opt/software下
- 解压 tar -zxvf canal.deployer-1.1.2.tar.gz -C ./canal/ 目录下
- 配置canal.properties和instance.properties文件中内容
cancal.properties
instance.properties
4. 启动canal ./bin/startup.sh
- 创建一个java的maven工程
- pom.xml中添加如下配置
com.alibaba.otter canal.client1.1.2
- 创建一个CanalClient.java文件
- 内容如下
//连接服务端canal CanalConnector example = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.10.130", 11111), "example", "", ""); //开始循环订阅数据 while (true) { //开始连接 example.connect(); //订阅某个库或者表数据 example.subscribe("test-canal.*"); //获取变化数据 Message message = example.get(100); //拿到数据 Listentries = message.getEntries(); //判断是否存在数据 if (entries.size() <= 0) { log.info("没有数据"); Thread.sleep(1000L); } else { //开始遍历数据 for (CanalEntry.Entry e : entries) { String tableName = e.getHeader().getTableName(); CanalEntry.EntryType entryType = e.getEntryType(); if (CanalEntry.EntryType.ROWDATA.equals(entryType)) { ByteString storevalue = e.getStorevalue(); CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storevalue); List rowDatasList = rowChange.getRowDatasList(); for (CanalEntry.RowData row : rowDatasList) { Map beforeMap = new HashMap<>(1 << 4); List beforeColumnsList = row.getBeforeColumnsList(); for (CanalEntry.Column c : beforeColumnsList) { beforeMap.put(c.getName(), c.getValue()); } Map afterMap = new HashMap<>(1 << 4); List afterColumnsList = row.getAfterColumnsList(); for (CanalEntry.Column c : afterColumnsList) { afterMap.put(c.getName(), c.getValue()); } log.info("表名:{},之前数据:{},之后数据:{}", tableName, JSON.toJSON(beforeMap), JSON.toJSON(afterMap)); } } } } }
- 数据变化格式
上面为一个canal的简单使用。具体详细使用可以查看Canal的gitHub项目地址
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)