springboot+数据同步框架canal,将mysql的数据同步到多个地方,比如其他库,redis,es,mq等

springboot+数据同步框架canal,将mysql的数据同步到多个地方,比如其他库,redis,es,mq等,第1张

springboot+数据同步框架canal,将mysql的数据同步到多个地方,比如其他库,redis,es,mq等

目前正在做一个项目,人力资源方面的,大部分业务都与用户表息息相关,用户服务和其他各个服务相互独立,所以无法进行关联查询。并且实际业务需要,很多都需要去关联到用户表,所以比较纠结,故有如下方案:

方案一:

将用户表经常使用到的字段冗余到各个业务表,用户信息修改之后,发送一个消息到mq,然后涉及到冗余用户字段的服务去订阅这个队列,然后进行修改冗余字段数据,其实这个方法也不错。
优点:此方案其他业务涉及到用户数据的不用进行关联查询,效率高
缺点:冗余字段很难维护,冗余度会随着业务不断的扩张而扩张,如果增加某个字段,很容易漏掉

方案二:

利用canal,单独抽离出一个服务,此服务只用户将用户服务的用户数据,同步到其他服务对应所拆分的库,此服务采用多数据源的循环的去维护各个库中的用户表。
优点:单独抽离出一个服务,方便维护与管理,可扩展性较高,如果增加分库的服务,则增加一个数据源即可,代码基本无需改动。
缺点:每一个库都要建立一个用户表,比较浪费资源,并且会使用的关联查询。

结合到代码的可扩展性和可维护性,最终考虑使用canal数据同步来实现。(本人比较懒,不想干那种重复性的工作,所以不想一样的代码,写n次。哈哈)

安装参照:安装直通车

一、首先安装canal

下载地址:https://github.com/alibaba/canal/releases
这里我选择的是1.1.4版本

点击直接下载。
如果地址无法访问,请加微信:osm164502,我可私发。

2、解压,修改配置文件

修改:安装目录/conf/example/instance.properties文件对应的位置

# position info
canal.instance.master.address=127.0.0.1:3306

# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal

修改startup.bat的启动脚本

3、配置mysql

找到my.ini文件

我的配置文件windows在:C:ProgramDataMySQLMySQL Server 5.7my.ini,如果你的也是5.7,也是在这个位置。
打开文件,找到:Binary Logging.
新增如下配置:

log-bin=mysql-bin     #binlog文件名
binlog_format=ROW     #选择row模式

找到Server Id,设置server-id,

server-id=1 # mysql实例id,不能和canal的slaveId重复

重启mysql服务

新增用户canal
这个用户名可以改成你想改的,这里的用户密码需要和instance.properties里面的用户名密码对应!

-- 使用命令登录:mysql -u root -p
-- 创建用户 用户名:canal 密码:canal
create user 'canal'@'%' identified by 'canal';
-- 授权 *.*表示所有库
grant SELECt, REPLICATION SLAVE, REPLICATION CLIENT on *.* to 'canal'@'%' identified by 'canal';
4、启动,双击startup.bat

到此canal服务就安装好了,然后根据我们的实际业务处理来了。

单独抽离一个springboot服务,来处理即可 1、新建一个springboot项目,pom文件如下:
	
	    org.springframework.boot
	    spring-boot-starter-parent
	    2.3.1.RELEASE
	

    
    	
            com.alibaba.otter
            canal.client
            1.1.4
        
        
        
            org.springframework.boot
            spring-boot-starter-web
        

        
            org.springframework.boot
            spring-boot-starter-jdbc
        


        
            com.alibaba
            druid-spring-boot-starter
            1.1.10
        

        
            mysql
            mysql-connector-java
            runtime
        

        
            org.projectlombok
            lombok
            1.16.18
            provided
        

        
            com.baomidou
            mybatis-plus-boot-starter
            3.3.1
            
                
                    com.baomidou
                    mybatis-plus-generator
                
            
        


    
2、配置文件
server:
  port: 9999
spring:
  datasource:
    type: com.alibaba.druid.pool.DruidDataSource
    druid:
      work-server:
        #MySQL配置
        driverClassName: com.mysql.cj.jdbc.Driver
        url: jdbc:mysql://localhost:3306/db1?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=Asia/Shanghai
        username: root
        password: root
      partner-server:
        #MySQL配置
        driverClassName: com.mysql.cj.jdbc.Driver
        url: jdbc:mysql://localhost:3306/db2?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=Asia/Shanghai
        username: root
        password: root

# mybatis-plus配置
mybatis-plus:
  mapperLocations: classpath:mapper/*Mapper.xml
  configuration:
    lazy-loading-enabled: true
    aggressive-lazy-loading: false
    map-underscore-to-camel-case: true
  type-aliases-package: com.xxx.entity

xfr:
  canal:
    host: 127.0.0.1
    port: 11111
    subscribe: db.user_info
3、启动类
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;

@MapperScan("com.xxx.mapper") // mybatis的包扫描配置
@SpringBootApplication(exclude= {DataSourceAutoConfiguration.class})  // 排除数据源自动配置,因为我们要手动配置多个数据源,需要同步到多个库中
public class CanalApplication {

    public static void main(String[] args) {
        SpringApplication.run(CanalApplication.class, args);
    }

}
4、CanalConfig
package com.xfr.canal;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.Message;
import com.xfr.handler.DataHandler;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

import java.net.InetSocketAddress;
import java.util.List;


@Configuration
@ConfigurationProperties(prefix = "xfr.canal")
public class CanalConfig implements InitializingBean {


    
    @Autowired
    private List handlerList;

    @Value("${xfr.canal.host}")
    private String host;
    @Value("${xfr.canal.port}")
    private Integer port;
    @Value("${xfr.canal.subscribe}")
    private String subscribe;

    @Override
    public void afterPropertiesSet() throws Exception {
        // 创建链接
        InetSocketAddress inetSocketAddress = new InetSocketAddress(host, port);
        CanalConnector connector = CanalConnectors.newSingleConnector(inetSocketAddress, "example", "", "");
        try {
            //打开连接
            connector.connect();
            //订阅数据库表,全部表
            connector.subscribe(subscribe);
            //回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
            connector.rollback();
            while (true) {
                // 获取指定数量的数据
                Message message = connector.getWithoutAck(200);
                //获取批量ID
                long batchId = message.getId();
                //获取批量的数量
                int size = message.getEntries().size();
                //如果没有数据
                if (batchId == -1 || size == 0) {
                    try {
                        //线程休眠2秒
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    //如果有数据,处理数据
                    for (DataHandler dataHandler : handlerList) {
                        dataHandler.doHandler(message.getEntries());
                    }
                }
                //进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。
                connector.ack(batchId);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            connector.disconnect();
        }
    }


}

4、动态数据源配置
import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;

import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;


public class DynamicDataSource  extends AbstractRoutingDataSource {

    private static final ThreadLocal contextHolder = new ThreadLocal<>();

    public static Map allDataSources = new HashMap<>();

    public DynamicDataSource(DataSource defaultTargetDataSource, Map targetDataSources) {
        super.setDefaultTargetDataSource(defaultTargetDataSource);
        super.setTargetDataSources(targetDataSources);
        super.afterPropertiesSet();
        allDataSources = targetDataSources;
    }

    @Override
    protected Object determineCurrentLookupKey() {
        return getDataSource();
    }

    public static void setDataSource(String dataSource) {
        contextHolder.set(dataSource);
    }

    public static String getDataSource() {
        return contextHolder.get();
    }

    public static void clearDataSource() {
        contextHolder.remove();
    }
}

import com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceBuilder;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Component;

import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;

@Configuration
@Component
public class DynamicDataSourceConfig {

    @Bean
    @ConfigurationProperties("spring.datasource.druid.work-server")
    public DataSource workServerDataSource(){
        return DruidDataSourceBuilder.create().build();
    }

    @Bean
    @ConfigurationProperties("spring.datasource.druid.partner-server")
    public DataSource partnerServerDataSource(){
        return DruidDataSourceBuilder.create().build();
    }

    @Bean
    @Primary
    public DynamicDataSource dataSource(DataSource workServerDataSource, DataSource partnerServerDataSource) {
        Map targetDataSources = new HashMap<>();
        targetDataSources.put("workServer",workServerDataSource);
        targetDataSources.put("partnerServer", partnerServerDataSource);
        return new DynamicDataSource(workServerDataSource, targetDataSources);
    }
}
4、数据处理器
import com.alibaba.otter.canal.protocol.CanalEntry;

import java.util.List;


public interface DataHandler {
    
    void doHandler(List entrys) throws Exception;
}

import com.alibaba.otter.canal.protocol.CanalEntry;
import com.google.common.base.Objects;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.xfr.datasource.DynamicDataSource;
import com.xfr.entity.UserInfoCopy;
import com.xfr.service.impl.UserInfoServiceImpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;

import java.util.ArrayList;
import java.util.List;


@Service
public class SyncUserInfoHandler implements DataHandler {

    @Autowired
    private UserInfoServiceImpl userInfoService;

    @Override
    public void doHandler(List entrys) throws InvalidProtocolBufferException {
        for (CanalEntry.Entry entry : entrys) {
            String tableName = entry.getHeader().getTableName();
            CanalEntry.EntryType entryType = entry.getEntryType();
            ByteString storevalue = entry.getStorevalue();
            if (Objects.equal(entryType, CanalEntry.EntryType.ROWDATA)) {
                CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storevalue);
                List userInfoList = new ArrayList<>();
                // 删除语句
                if (Objects.equal(rowChange.getEventType(), CanalEntry.EventType.DELETE)) {
                    List delIds = new ArrayList<>();
                    genDelIds(rowChange, delIds);
                    userInfoService.removeByIds(delIds);
                } else if (Objects.equal(rowChange.getEventType(), CanalEntry.EventType.INSERT)) {
                    genUserInfoList(rowChange, rowChange.getEventType(), userInfoList);
                    saveBatch(userInfoList);
                } else if (Objects.equal(rowChange.getEventType(), CanalEntry.EventType.UPDATE)) {
                    genUserInfoList(rowChange, rowChange.getEventType(), userInfoList);
                    if (!CollectionUtils.isEmpty(userInfoList)) {
                        userInfoService.updateBatchById(userInfoList);
                    }
                }
                userInfoList.forEach(item -> {
                    System.out.println(item);
                });
            } else {
                System.out.println("当前 *** 作类型为: " + entryType);
            }
        }
    }

    private void saveBatch(List userInfoList) {
        for (Object key : DynamicDataSource.allDataSources.keySet()) {
            System.out.println("Key = " + key);
            DynamicDataSource.setDataSource((String) key);
            if (!CollectionUtils.isEmpty(userInfoList)) {
                userInfoService.saveBatch(userInfoList);
            }
        }
        DynamicDataSource.clearDataSource();
    }

    private void genDelIds(CanalEntry.RowChange rowChange, List delIds) {
        row:for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
            column:for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
                if (Objects.equal(column.getName(), "id")) {
                    delIds.add(Integer.valueOf(column.getValue()));
                    continue column;
                }
            }
        }
    }

    private void genUserInfoList(CanalEntry.RowChange rowChange, CanalEntry.EventType eventType, List userInfoList) {
        if (eventType.equals(CanalEntry.EventType.UPDATE)) {
            genUpdate(rowChange, userInfoList);
        } else if (eventType.equals(CanalEntry.EventType.INSERT)) {
            genInsert(rowChange, userInfoList);
        }
    }

    public void genUpdate(CanalEntry.RowChange rowChange, List userInfoList) {
        for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
            UserInfoCopy userInfo = new UserInfoCopy();
            for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
                if (Objects.equal(column.getName(), "id")) {
                    userInfo.setId(Integer.valueOf(column.getValue()));
                } else if (Objects.equal(column.getName(), "name") && column.getUpdated()) {
                    userInfo.setName(column.getValue());
                } else if (Objects.equal(column.getName(), "account") && column.getUpdated()) {
                    userInfo.setAccount(column.getValue());
                } else if (Objects.equal(column.getName(), "password") && column.getUpdated()) {
                    userInfo.setPassword(column.getValue());
                } else if (Objects.equal(column.getName(), "phone") && column.getUpdated()) {
                    userInfo.setPhone(column.getValue());
                } else if (Objects.equal(column.getName(), "status") && column.getUpdated()) {
                    userInfo.setStatus(Integer.valueOf(column.getValue()));
                }
            }
            userInfoList.add(userInfo);
        }
    }

    public void genInsert(CanalEntry.RowChange rowChange, List userInfoList) {
        for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
            UserInfoCopy userInfo = new UserInfoCopy();
            for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
                if (Objects.equal(column.getName(), "id")) {
                    userInfo.setId(Integer.valueOf(column.getValue()));
                } else if (Objects.equal(column.getName(), "name")) {
                    userInfo.setName(column.getValue());
                } else if (Objects.equal(column.getName(), "account")) {
                    userInfo.setAccount(column.getValue());
                } else if (Objects.equal(column.getName(), "password")) {
                    userInfo.setPassword(column.getValue());
                } else if (Objects.equal(column.getName(), "phone")) {
                    userInfo.setPhone(column.getValue());
                }
            }
            userInfoList.add(userInfo);
        }
    }
}

实际真实业务处理在SyncUserInfoHandler类,如果需要增加其他扩展,只需要实现DataHandler接口,并实现doHandler方法即可。
源码地址

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5716418.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存