阿里的数据同步神器——Canal

阿里的数据同步神器——Canal,第1张

Canal介绍

Canal是阿里巴巴的数据同步工具,最初主要为了应对杭州和美国的双机房部署问题,目前也是国内互联网企业经常使用的数据增量同步解决方案。

原理:

canal将自己伪装为MySQL的slave,向master发送dump协议master收到dump协议,数据发生修改后推送binary log给canalcanal解析binary log对象,转换为增量数据,同步到ES、Redis等 Canal 安装

MySQL配置

注:本案例的mysql在windows上,linux环境的配置没有太大区别

首先要让mysql开启binlog模式

1) 进入mysql查看是否启动binlog

SHOW VARIABLES LIKE '%log_bin%'

log_bin为ON表示启动,为OFF则未启动,需要修改mysql配置文件启动log_bin

windows配置文件是MySQL安装目录的my.ini

linux在/etc/my.cnf

修改:

[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server_id=1

2) 创建用户

进入mysql,创建canal用户并授权

create user canal@'%'IDENTIFIED WITH mysql_native_password BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

下载和安装canal

到官网下载 https://github.com/alibaba/canal/releases

这里使用的是1.1.4版本

上传文件到Linux,解压到canal目录中

cd /usr/local
mkdir canal
tar -vxf canal.deployer-1.1.4.tar.gz -C canal

配置Canal

进入mysql,输入命令,记录文件名和位置

show master status;

进入canal目录,修改配置文件

vi conf/example/instance.properties

启动Canal

进入bin目录启动服务

./startup.sh

关闭服务使用 stop.sh

查看启动日志文件

cat /usr/local/canal/logs/canal/canal.log
cat /usr/local/canal/logs/example/example.log

以上效果表示已经运行,如果出现异常可以按日志情况解决

主要问题总结:

异常信息 authentication error,数据库账号和密码配置错误异常信息 can’t find position,检查配置的文件名和位置,再删除conf/example/meta.dat 重启客户端版本兼容问题,canal的版本和客户端的版本要一致 Canal 客户端 官方客户端

1) 引入依赖


    com.alibaba.otter
    canal.client
    1.1.4

2) Java代码

package com.blb.canal_demo;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;

import java.net.InetSocketAddress;
import java.util.List;

/**
 * 客户端测试
 */
public class ClientTest {
    public static void main(String args[]) {
        // 创建canal连接对象
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.223.223",
                11111), "example", "canal", "canal");
        try {
            //连接
            connector.connect();
            //订阅所有数据库和表
            connector.subscribe(".*\\..*");
            connector.rollback();
            while (true) {
                // 获取指定数量的数据
                Message message = connector.getWithoutAck(1000);
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    //没有数据,就休眠1秒
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
                } else {
                    //有数据就打印
                    printEntry(message.getEntries());
                }
                // 提交确认
                connector.ack(batchId);
            }
        } finally {
            connector.disconnect();
        }
    }

    private static void printEntry(List entrys) {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN ||
                    entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }
            RowChange rowChage = null;
            try {
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR parse data:" + entry.toString(),e);
            }
            EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));
            for (RowData rowData : rowChage.getRowDatasList()) {
                //判断增删改 *** 作
                if (eventType == EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------> before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------> after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }

    private static void printColumn(List columns) {
        for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
        }
    }
}

修改了数据库中任意一张表的数据,canal客户端监听到mysql数据的修改

第三方客户端

官方客户端的代码比较繁琐,这里使用了第三方客户端采用SpringBoot整合,使用比较简单

https://github.com/chenqian56131/spring-boot-starter-canal

1) 引入依赖

首先下载该开源项目,安装到本地的maven中,在项目中就可以使用该依赖


    com.xpand
    starter-canal
    0.0.1-SNAPSHOT

2) 启动类添加注解

@EnableCanalClient

3)配置文件

canal.client.instances.example.host=192.168.223.223
canal.client.instances.example.port=11111
canal.client.instances.example.batchSize=1000

4) 监听器

package com.blb.canal_demo;

import com.alibaba.otter.canal.protocol.CanalEntry;
import com.xpand.starter.canal.annotation.CanalEventListener;
import com.xpand.starter.canal.annotation.ListenPoint;

/**
 * 事件监听器
 */
@CanalEventListener
public class CanalListener {

    /**
     * 监听 erp数据库的customer表
     */
    @ListenPoint(schema = "erp",table = "customer")
    public void updateData(CanalEntry.EventType eventType, CanalEntry.RowData rowData){
        System.out.println("修改前");
        //打印改变之前的数据
        rowData.getBeforeColumnsList().forEach((c)-> System.out.print(c.getName()+":"+c.getValue()+"\t"));
        System.out.println("\n修改后");
        //打印改变之后的数据
        rowData.getAfterColumnsList().forEach((c)-> System.out.print(c.getName()+":"+c.getValue()+"\t"));
    }
}

Canal+RabbitMQ实现数据增量同步

实际开发过程中,我们常使用Canal配合RabbitMQ实现MySQL和其它存储系统的增量同步,下面是分布式在线教育系统中实现数据库和Elasticsearch的同步过程

步骤:

课程微服务对MySQL中的课程数据库课程表进行增删改 *** 作,MySQL发送binlog给Canal数据同步微服务通过Canal监听器获得具体的数据,通过RabbitMQ发送给搜索微服务搜索微服务监听RabbitMQ消息,对Elasticsearch课程索引进行同步更新

课程表的增删改这里就不介绍了,主要看看同步服务的核心代码

依赖

    com.xpand
    starter-canal
    0.0.1-SNAPSHOT



    org.springframework.boot
    spring-boot-starter-amqp



    com.alibaba
    fastjson
    1.2.47

配置文件
server.port=8701
# canal配置
canal.client.instances.example.host=192.168.223.223
canal.client.instances.example.port=11111
canal.client.instances.example.batchSize=1000
# rabbitMQ配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=myhost
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
MQ配置
/**
 * RabbitMQ的配置
 */
@Slf4j
@Configuration
public class RabbitMQConfig {

    public static final String QUEUE_COURSE_SAVE = "queue.course.save";
    public static final String QUEUE_COURSE_REMOVE = "queue.course.remove";
    public static final String KEY_COURSE_SAVE = "key.course.save";
    public static final String KEY_COURSE_REMOVE = "key.course.remove";
    public static final String COURSE_EXCHANGE = "edu.course.exchange";

    @Bean
    public Queue queueCourseSave() {
        return new Queue(QUEUE_COURSE_SAVE);
    }

    @Bean
    public Queue queueCourseRemove() {
        return new Queue(QUEUE_COURSE_REMOVE);
    }

    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(COURSE_EXCHANGE);
    }

    @Bean
    public Binding bindCourseSave() {
        return BindingBuilder.bind(queueCourseSave()).to(topicExchange()).with(KEY_COURSE_SAVE);
    }

    @Bean
    public Binding bindCourseRemove() {
        return BindingBuilder.bind(queueCourseRemove()).to(topicExchange()).with(KEY_COURSE_REMOVE);
    }
}
Canal监听器
/**
 * 课程表数据同步监听器
 */
@Slf4j
@CanalEventListener
public class CourseSyncListener {

    @Autowired
    RabbitTemplate rabbitTemplate;

    /**
     * 监听课程表的修改
     */
    @ListenPoint(schema = "edu_course",table = "course")
    public void handleCourseChange(EventType eventType, RowData rowData){
        log.info("course表 *** 作:{}",eventType);
        if(eventType == EventType.INSERT || eventType == EventType.UPDATE){
            //获得修改后的数据
            Map map = new HashMap<>();
            rowData.getAfterColumnsList().forEach(c -> {
                map.put(c.getName(),c.getValue());
            });
            String json = JSON.toJSONString(map);
            log.info("保存数据:{}",json);
            //发送给mq,通知搜索服务进行添加
            rabbitTemplate.convertAndSend(RabbitMQConfig.COURSE_EXCHANGE,
                RabbitMQConfig.KEY_COURSE_SAVE, json));
        }else if(eventType == EventType.DELETE){
            //获得删除前的id
            Long[] id = new Long[1];
            rowData.getBeforeColumnsList().forEach(c -> {
                if("id".equals(c.getName())){
                    id[0] = Long.valueOf(c.getValue());
                }
            });
            log.info("删除数据:{}",id[0]);
            //发送给mq,通知搜索服务进行删除
            rabbitTemplate.convertAndSend(RabbitMQConfig.COURSE_EXCHANGE,
                    RabbitMQConfig.KEY_COURSE_REMOVE, Long.valueOf(id[0]));
        }else{
            log.info("不支持其它 *** 作");
        }
    }
}

搜索服务的消息监听

@Slf4j
@Component
public class CourseMQListener {

    public static final String QUEUE_COURSE_SAVE = "queue.course.save";
    public static final String QUEUE_COURSE_REMOVE = "queue.course.remove";
    public static final String KEY_COURSE_SAVE = "key.course.save";
    public static final String KEY_COURSE_REMOVE = "key.course.remove";
    public static final String COURSE_EXCHANGE = "course.exchange";
    
    @Autowired
    ICourseService courseService;

    /**
     * 监听课程添加和更新 *** 作
     */
    @RabbitListener(bindings = {
            @QueueBinding(value = @Queue(value = QUEUE_COURSE_SAVE, durable = "true"),
                    exchange = @Exchange(value = COURSE_EXCHANGE,
                            type = ExchangeTypes.TOPIC,
                            ignoreDeclarationExceptions = "true")
                    , key = KEY_COURSE_SAVE)})
    public void receiveCourseSaveMessage(String json, Channel channel, Message message) throws IOException {
        log.info("保存课程课程:{}",json);
        //将消息转为课程,保存到es中
        Course course = JSON.parseObject(json,Course.class);
        //保存课程到ElasticSearch中
        courseService.saveOrUpdate(course);
    }

    /**
     * 监听课程删除 *** 作
     */
    @RabbitListener(bindings = {
            @QueueBinding(value = @Queue(value = QUEUE_COURSE_REMOVE, durable = "true"),
                    exchange = @Exchange(value = COURSE_EXCHANGE,
                            type = ExchangeTypes.TOPIC,
                            ignoreDeclarationExceptions = "true")
                    , key = KEY_COURSE_REMOVE)})
    public void receiveCourseDeleteMessage(Long id) {
        courseService.removeById(id);
        log.info("课程删除完成:{}",id);
    }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/sjk/991364.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-21
下一篇 2022-05-21

发表评论

登录后才能评论

评论列表(0条)

保存