目前正在做一个项目,人力资源方面的,大部分业务都与用户表息息相关,用户服务和其他各个服务相互独立,所以无法进行关联查询。并且实际业务需要,很多都需要去关联到用户表,所以比较纠结,故有如下方案:
方案一:
将用户表经常使用到的字段冗余到各个业务表,用户信息修改之后,发送一个消息到mq,然后涉及到冗余用户字段的服务去订阅这个队列,然后进行修改冗余字段数据,其实这个方法也不错。
优点:此方案其他业务涉及到用户数据的不用进行关联查询,效率高
缺点:冗余字段很难维护,冗余度会随着业务不断的扩张而扩张,如果增加某个字段,很容易漏掉
方案二:
利用canal,单独抽离出一个服务,此服务只用户将用户服务的用户数据,同步到其他服务对应所拆分的库,此服务采用多数据源的循环的去维护各个库中的用户表。
优点:单独抽离出一个服务,方便维护与管理,可扩展性较高,如果增加分库的服务,则增加一个数据源即可,代码基本无需改动。
缺点:每一个库都要建立一个用户表,比较浪费资源,并且会使用的关联查询。
安装参照:安装直通车
一、首先安装canal下载地址:https://github.com/alibaba/canal/releases
这里我选择的是1.1.4版本
点击直接下载。
如果地址无法访问,请加微信:osm164502,我可私发。
修改:安装目录/conf/example/instance.properties文件对应的位置
# position info canal.instance.master.address=127.0.0.1:3306 # username/password canal.instance.dbUsername=canal canal.instance.dbPassword=canal
修改startup.bat的启动脚本
找到my.ini文件
我的配置文件windows在:C:ProgramDataMySQLMySQL Server 5.7my.ini,如果你的也是5.7,也是在这个位置。
打开文件,找到:Binary Logging.
新增如下配置:
log-bin=mysql-bin #binlog文件名 binlog_format=ROW #选择row模式
找到Server Id,设置server-id,
server-id=1 # mysql实例id,不能和canal的slaveId重复
重启mysql服务
新增用户canal
这个用户名可以改成你想改的,这里的用户密码需要和instance.properties里面的用户名密码对应!
-- 使用命令登录:mysql -u root -p -- 创建用户 用户名:canal 密码:canal create user 'canal'@'%' identified by 'canal'; -- 授权 *.*表示所有库 grant SELECt, REPLICATION SLAVE, REPLICATION CLIENT on *.* to 'canal'@'%' identified by 'canal';4、启动,双击startup.bat
到此canal服务就安装好了,然后根据我们的实际业务处理来了。
单独抽离一个springboot服务,来处理即可 1、新建一个springboot项目,pom文件如下:2、配置文件org.springframework.boot spring-boot-starter-parent2.3.1.RELEASE com.alibaba.otter canal.client1.1.4 org.springframework.boot spring-boot-starter-weborg.springframework.boot spring-boot-starter-jdbccom.alibaba druid-spring-boot-starter1.1.10 mysql mysql-connector-javaruntime org.projectlombok lombok1.16.18 provided com.baomidou mybatis-plus-boot-starter3.3.1 com.baomidou mybatis-plus-generator
server: port: 9999 spring: datasource: type: com.alibaba.druid.pool.DruidDataSource druid: work-server: #MySQL配置 driverClassName: com.mysql.cj.jdbc.Driver url: jdbc:mysql://localhost:3306/db1?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=Asia/Shanghai username: root password: root partner-server: #MySQL配置 driverClassName: com.mysql.cj.jdbc.Driver url: jdbc:mysql://localhost:3306/db2?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=Asia/Shanghai username: root password: root # mybatis-plus配置 mybatis-plus: mapperLocations: classpath:mapper/*Mapper.xml configuration: lazy-loading-enabled: true aggressive-lazy-loading: false map-underscore-to-camel-case: true type-aliases-package: com.xxx.entity xfr: canal: host: 127.0.0.1 port: 11111 subscribe: db.user_info3、启动类
import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; @MapperScan("com.xxx.mapper") // mybatis的包扫描配置 @SpringBootApplication(exclude= {DataSourceAutoConfiguration.class}) // 排除数据源自动配置,因为我们要手动配置多个数据源,需要同步到多个库中 public class CanalApplication { public static void main(String[] args) { SpringApplication.run(CanalApplication.class, args); } }4、CanalConfig
package com.xfr.canal; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.Message; import com.xfr.handler.DataHandler; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; import java.net.InetSocketAddress; import java.util.List; @Configuration @ConfigurationProperties(prefix = "xfr.canal") public class CanalConfig implements InitializingBean { @Autowired private List4、动态数据源配置handlerList; @Value("${xfr.canal.host}") private String host; @Value("${xfr.canal.port}") private Integer port; @Value("${xfr.canal.subscribe}") private String subscribe; @Override public void afterPropertiesSet() throws Exception { // 创建链接 InetSocketAddress inetSocketAddress = new InetSocketAddress(host, port); CanalConnector connector = CanalConnectors.newSingleConnector(inetSocketAddress, "example", "", ""); try { //打开连接 connector.connect(); //订阅数据库表,全部表 connector.subscribe(subscribe); //回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿 connector.rollback(); while (true) { // 获取指定数量的数据 Message message = connector.getWithoutAck(200); //获取批量ID long batchId = message.getId(); //获取批量的数量 int size = message.getEntries().size(); //如果没有数据 if (batchId == -1 || size == 0) { try { //线程休眠2秒 Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } else { //如果有数据,处理数据 for (DataHandler dataHandler : handlerList) { dataHandler.doHandler(message.getEntries()); } } //进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。 connector.ack(batchId); } } catch (Exception e) { e.printStackTrace(); } finally { connector.disconnect(); } } }
import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource; import javax.sql.DataSource; import java.util.HashMap; import java.util.Map; public class DynamicDataSource extends AbstractRoutingDataSource { private static final ThreadLocalcontextHolder = new ThreadLocal<>(); public static Map
import com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceBuilder; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import org.springframework.stereotype.Component; import javax.sql.DataSource; import java.util.HashMap; import java.util.Map; @Configuration @Component public class DynamicDataSourceConfig { @Bean @ConfigurationProperties("spring.datasource.druid.work-server") public DataSource workServerDataSource(){ return DruidDataSourceBuilder.create().build(); } @Bean @ConfigurationProperties("spring.datasource.druid.partner-server") public DataSource partnerServerDataSource(){ return DruidDataSourceBuilder.create().build(); } @Bean @Primary public DynamicDataSource dataSource(DataSource workServerDataSource, DataSource partnerServerDataSource) { Map4、数据处理器targetDataSources = new HashMap<>(); targetDataSources.put("workServer",workServerDataSource); targetDataSources.put("partnerServer", partnerServerDataSource); return new DynamicDataSource(workServerDataSource, targetDataSources); } }
import com.alibaba.otter.canal.protocol.CanalEntry; import java.util.List; public interface DataHandler { void doHandler(Listentrys) throws Exception; }
import com.alibaba.otter.canal.protocol.CanalEntry; import com.google.common.base.Objects; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; import com.xfr.datasource.DynamicDataSource; import com.xfr.entity.UserInfoCopy; import com.xfr.service.impl.UserInfoServiceImpl; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; import java.util.ArrayList; import java.util.List; @Service public class SyncUserInfoHandler implements DataHandler { @Autowired private UserInfoServiceImpl userInfoService; @Override public void doHandler(Listentrys) throws InvalidProtocolBufferException { for (CanalEntry.Entry entry : entrys) { String tableName = entry.getHeader().getTableName(); CanalEntry.EntryType entryType = entry.getEntryType(); ByteString storevalue = entry.getStorevalue(); if (Objects.equal(entryType, CanalEntry.EntryType.ROWDATA)) { CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storevalue); List userInfoList = new ArrayList<>(); // 删除语句 if (Objects.equal(rowChange.getEventType(), CanalEntry.EventType.DELETE)) { List delIds = new ArrayList<>(); genDelIds(rowChange, delIds); userInfoService.removeByIds(delIds); } else if (Objects.equal(rowChange.getEventType(), CanalEntry.EventType.INSERT)) { genUserInfoList(rowChange, rowChange.getEventType(), userInfoList); saveBatch(userInfoList); } else if (Objects.equal(rowChange.getEventType(), CanalEntry.EventType.UPDATE)) { genUserInfoList(rowChange, rowChange.getEventType(), userInfoList); if (!CollectionUtils.isEmpty(userInfoList)) { userInfoService.updateBatchById(userInfoList); } } userInfoList.forEach(item -> { System.out.println(item); }); } else { System.out.println("当前 *** 作类型为: " + entryType); } } } private void saveBatch(List userInfoList) { for (Object key : DynamicDataSource.allDataSources.keySet()) { System.out.println("Key = " + key); DynamicDataSource.setDataSource((String) key); if (!CollectionUtils.isEmpty(userInfoList)) { userInfoService.saveBatch(userInfoList); } } DynamicDataSource.clearDataSource(); } private void genDelIds(CanalEntry.RowChange rowChange, List delIds) { row:for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { column:for (CanalEntry.Column column : rowData.getBeforeColumnsList()) { if (Objects.equal(column.getName(), "id")) { delIds.add(Integer.valueOf(column.getValue())); continue column; } } } } private void genUserInfoList(CanalEntry.RowChange rowChange, CanalEntry.EventType eventType, List userInfoList) { if (eventType.equals(CanalEntry.EventType.UPDATE)) { genUpdate(rowChange, userInfoList); } else if (eventType.equals(CanalEntry.EventType.INSERT)) { genInsert(rowChange, userInfoList); } } public void genUpdate(CanalEntry.RowChange rowChange, List userInfoList) { for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { UserInfoCopy userInfo = new UserInfoCopy(); for (CanalEntry.Column column : rowData.getAfterColumnsList()) { if (Objects.equal(column.getName(), "id")) { userInfo.setId(Integer.valueOf(column.getValue())); } else if (Objects.equal(column.getName(), "name") && column.getUpdated()) { userInfo.setName(column.getValue()); } else if (Objects.equal(column.getName(), "account") && column.getUpdated()) { userInfo.setAccount(column.getValue()); } else if (Objects.equal(column.getName(), "password") && column.getUpdated()) { userInfo.setPassword(column.getValue()); } else if (Objects.equal(column.getName(), "phone") && column.getUpdated()) { userInfo.setPhone(column.getValue()); } else if (Objects.equal(column.getName(), "status") && column.getUpdated()) { userInfo.setStatus(Integer.valueOf(column.getValue())); } } userInfoList.add(userInfo); } } public void genInsert(CanalEntry.RowChange rowChange, List userInfoList) { for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { UserInfoCopy userInfo = new UserInfoCopy(); for (CanalEntry.Column column : rowData.getAfterColumnsList()) { if (Objects.equal(column.getName(), "id")) { userInfo.setId(Integer.valueOf(column.getValue())); } else if (Objects.equal(column.getName(), "name")) { userInfo.setName(column.getValue()); } else if (Objects.equal(column.getName(), "account")) { userInfo.setAccount(column.getValue()); } else if (Objects.equal(column.getName(), "password")) { userInfo.setPassword(column.getValue()); } else if (Objects.equal(column.getName(), "phone")) { userInfo.setPhone(column.getValue()); } } userInfoList.add(userInfo); } } }
实际真实业务处理在SyncUserInfoHandler类,如果需要增加其他扩展,只需要实现DataHandler接口,并实现doHandler方法即可。
源码地址
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)