**/
@documented
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface DataSource {
String value() default DataSourceNames.ONE;
}
然后是创建 DataSourceNames.java,用于简单数据源命名:
public interface DataSourceNames {
String ONE = “ONE”;
String TWO = “TWO”;
}
ps:其实这些都是我之前aop切换数据源的时候敲的,大概8月份的时候,这次我相当于在这个基础上着重解决事务问题
然后是将自定义注解作为切点,进行aop方式动态切换逻辑补全,创建DynamicDataSourceAspect.java:
import com.test.jtadbsource.dbConfig.DataSourceContextHolder;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.*;
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.lang.reflect.Method;
@Aspect
@Component
public class DynamicDataSourceAspect {
protected Logger logger = LoggerFactory.getLogger(getClass());
@Pointcut("@annotation(com.test.jtadbsource.dbAop.DataSource)")
public void dataSourcePointCut() {}
@Around(“dataSourcePointCut()”)
public Object around(ProceedingJoinPoint point) throws Throwable {
DataSource ds = null;
MethodSignature signature = (MethodSignature) point.getSignature();
Method method = signature.getMethod();
//获取自定义注解
ds = method.getAnnotation(DataSource.class);
if (ds == null) {
//如果监测到自定义注解不存在,那么默认切换到数据源 mydbone
DataSourceContextHolder.setDataSourceKey(DataSourceNames.ONE);
logger.info("set default datasource is " + DataSourceNames.ONE);
} else {
//自定义存在,则按照注解的值去切换数据源
DataSourceContextHolder.setDataSourceKey(ds.value());
logger.info("set datasource is " + ds.value());
}
return point.proceed();
}
@After(value = “dataSourcePointCut()”)
public void afterSwitchDS(JoinPoint point) {
DataSourceContextHolder.clearDataSourceKey();
logger.info(“clean datasource”);
}
}
上面用到的DataSourceContextHolder.java:
public class DataSourceContex
《一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》
【docs.qq.com/doc/DSmxTbFJ1cmN1R2dB】 完整内容开源分享
tHolder {
private static final ThreadLocal contextHolder = new ThreadLocal<>();
// 设置数据源名
public static void setDataSourceKey(String dbName) {
contextHolder.set(dbName);
}
// 获取数据源名
public static String getDataSourceKey() {
return (contextHolder.get());
}
// 清除数据源名
public static void clearDataSourceKey() {
contextHolder.remove();
}
}
ok,到这里,基本的动态切换边框的东西都完毕了,接下来是比较核心的:
1. DataSourceFactory.java :
用于 不同的数据源DataSource的信息配置,使用DruidXADataSource创建,支持jta事务;
将不同数据源DataSource分别都关联上对应的AtomikosDataSourceBean,这样事务能提取到JTA事务管理器;
重写数据源会话工厂,为每个数据源单独配置一个。
配置重写的sqlSessionTemplate,将实际使用的不同数据源的sqlsession和spring的事务机制关联起来。
import com.alibaba.druid.pool.xa.DruidXADataSource;
import com.test.jtadbsource.dbAop.DataSourceNames;
import org.apache.ibatis.logging.stdout.StdOutImpl;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.core.io.support.ResourcePatternResolver;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;
@Configuration
@MapperScan(basePackages = DataSourceFactory.base_PACKAGES, sqlSessionTemplateRef = “sqlSessionTemplate”)
public class DataSourceFactory {
static final String base_PACKAGES = “com.test.jtadbsource.mapper”;
private static final String MAPPER_LOCATION = “classpath:mybatis/mapper
@Bean
@ConfigurationProperties(“spring.datasource.druid.mydbone”)
public DataSource druidDataSourceOne() {
return new DruidXADataSource();
}
@Bean
@ConfigurationProperties(“spring.datasource.druid.mydbtwo”)
public DataSource druidDataSourceTwo() {
return new DruidXADataSource();
}
@Bean
public DataSource dataSourceOne(DataSource druidDataSourceOne) {
AtomikosDataSourceBean sourceBean = new AtomikosDataSourceBean();
sourceBean.setXaDataSource((DruidXADataSource) druidDataSourceOne);
// 必须为数据源指定唯一标识
sourceBean.setPoolSize(5);
sourceBean.setTestQuery(“SELECT 1”);
sourceBean.setUniqueResourceName(“mydbone”);
return sourceBean;
}
@Bean
public DataSource dataSourceTwo(DataSource druidDataSourceTwo) {
AtomikosDataSourceBean sourceBean = new AtomikosDataSourceBean();
sourceBean.setXaDataSource((DruidXADataSource) druidDataSourceTwo);
sourceBean.setPoolSize(5);
sourceBean.setTestQuery(“SELECT 1”);
sourceBean.setUniqueResourceName(“mydbtwo”);
return sourceBean;
}
@Bean
public SqlSessionFactory sqlSessionFactoryOne(DataSource dataSourceOne)
throws Exception {
return createSqlSessionFactory(dataSourceOne);
}
@Bean
public SqlSessionFactory sqlSessionFactoryTwo(DataSource dataSourceTwo)
throws Exception {
return createSqlSessionFactory(dataSourceTwo);
}
@Bean
public CustomSqlSessionTemplate sqlSessionTemplate(SqlSessionFactory sqlSessionFactoryOne, SqlSessionFactory sqlSessionFactoryTwo) {
Map
sqlSessionFactoryMap.put(DataSourceNames.ONE, sqlSessionFactoryOne);
sqlSessionFactoryMap.put(DataSourceNames.TWO, sqlSessionFactoryTwo);
CustomSqlSessionTemplate customSqlSessionTemplate = new CustomSqlSessionTemplate(sqlSessionFactoryOne);
customSqlSessionTemplate.setTargetSqlSessionFactories(sqlSessionFactoryMap);
return customSqlSessionTemplate;
}
private SqlSessionFactory createSqlSessionFactory(DataSource dataSource) throws Exception {
SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
factoryBean.setDataSource(dataSource);
org.apache.ibatis.session.Configuration configuration = new org.apache.ibatis.session.Configuration();
//配置驼峰命名
configuration.setMapUnderscoreToCamelCase(true);
//配置sql日志
configuration.setLogImpl(StdOutImpl.class);
factoryBean.setConfiguration(configuration);
ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
//配置读取mapper.xml路径
factoryBean.setMapperLocations(resolver.getResources(MAPPER_LOCATION));
return factoryBean.getObject();
}
}
上面用到的自定义CustomSqlSessionTemplate (重写SqlSessionTemplate):
import static java.lang.reflect.Proxy.newProxyInstance;
import static org.apache.ibatis.reflection.ExceptionUtil.unwrapThrowable;
import static org.mybatis.spring.SqlSessionUtils.closeSqlSession;
import static org.mybatis.spring.SqlSessionUtils.getSqlSession;
import static org.mybatis.spring.SqlSessionUtils.isSqlSessionTransactional;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.sql.Connection;
import java.util.List;
import java.util.Map;
import org.apache.ibatis.exceptions.PersistenceException;
import org.apache.ibatis.executor.BatchResult;
import org.apache.ibatis.session.Configuration;
import org.apache.ibatis.session.ExecutorType;
import org.apache.ibatis.session.ResultHandler;
import org.apache.ibatis.session.RowBounds;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.MyBatisExceptionTranslator;
import org.mybatis.spring.SqlSessionTemplate;
import org.springframework.dao.support.PersistenceExceptionTranslator;
import org.springframework.util.Assert;
public class CustomSqlSessionTemplate extends SqlSessionTemplate {
private final SqlSessionFactory sqlSessionFactory;
private final ExecutorType executorType;
private final SqlSession sqlSessionProxy;
private final PersistenceExceptionTranslator exceptionTranslator;
private Map
private SqlSessionFactory defaultTargetSqlSessionFactory;
public void setTargetSqlSessionFactories(Map
this.targetSqlSessionFactories = targetSqlSessionFactories;
}
public void setDefaultTargetSqlSessionFactory(SqlSessionFactory defaultTargetSqlSessionFactory) {
this.defaultTargetSqlSessionFactory = defaultTargetSqlSessionFactory;
}
public CustomSqlSessionTemplate(SqlSessionFactory sqlSessionFactory) {
this(sqlSessionFactory, sqlSessionFactory.getConfiguration().getDefaultExecutorType());
}
public CustomSqlSessionTemplate(SqlSessionFactory sqlSessionFactory, ExecutorType executorType) {
this(sqlSessionFactory, executorType, new MyBatisExceptionTranslator(sqlSessionFactory.getConfiguration()
.getEnvironment().getDataSource(), true));
}
public CustomSqlSessionTemplate(SqlSessionFactory sqlSessionFactory, ExecutorType executorType,
PersistenceExceptionTranslator exceptionTranslator) {
super(sqlSessionFactory, executorType, exceptionTranslator);
this.sqlSessionFactory = sqlSessionFactory;
this.executorType = executorType;
this.exceptionTranslator = exceptionTranslator;
this.sqlSessionProxy = (SqlSession) newProxyInstance(
SqlSessionFactory.class.getClassLoader(),
new Class[] { SqlSession.class },
new SqlSessionInterceptor());
this.defaultTargetSqlSessionFactory = sqlSessionFactory;
}
//通过DataSourceContextHolder获取当前的会话工厂
@Override
public SqlSessionFactory getSqlSessionFactory() {
String dataSourceKey = DataSourceContextHolder.getDataSourceKey();
SqlSessionFactory targetSqlSessionFactory = targetSqlSessionFactories.get(dataSourceKey);
if (targetSqlSessionFactory != null) {
return targetSqlSessionFactory;
} else if (defaultTargetSqlSessionFactory != null) {
return defaultTargetSqlSessionFactory;
} else {
Assert.notNull(targetSqlSessionFactories, “Property ‘targetSqlSessionFactories’ or ‘defaultTargetSqlSessionFactory’ are required”);
Assert.notNull(defaultTargetSqlSessionFactory, “Property ‘defaultTargetSqlSessionFactory’ or ‘targetSqlSessionFactories’ are required”);
}
return this.sqlSessionFactory;
}
@Override
public Configuration getConfiguration() {
return this.getSqlSessionFactory().getConfiguration();
}
public ExecutorType getExecutorType() {
return this.executorType;
}
public PersistenceExceptionTranslator getPersistenceExceptionTranslator() {
return this.exceptionTranslator;
}
public T selectOne(String statement) {
return this.sqlSessionProxy. selectOne(statement);
}
public T selectOne(String statement, Object parameter) {
return this.sqlSessionProxy. selectOne(statement, parameter);
}
public
return this.sqlSessionProxy.
}
public
return this.sqlSessionProxy.
}
public
return this.sqlSessionProxy.
}
public List selectList(String statement) {
return this.sqlSessionProxy. selectList(statement);
}
public List selectList(String statement, Object parameter) {
return this.sqlSessionProxy. selectList(statement, parameter);
}
public List selectList(String statement, Object parameter, RowBounds rowBounds) {
return this.sqlSessionProxy. selectList(statement, parameter, rowBounds);
}
public void select(String statement, ResultHandler handler) {
this.sqlSessionProxy.select(statement, handler);
}
public void select(String statement, Object parameter, ResultHandler handler) {
this.sqlSessionProxy.select(statement, parameter, handler);
}
public void select(String statement, Object parameter, RowBounds rowBounds, ResultHandler handler) {
this.sqlSessionProxy.select(statement, parameter, rowBounds, handler);
}
public int insert(String statement) {
return this.sqlSessionProxy.insert(statement);
}
public int insert(String statement, Object parameter) {
return this.sqlSessionProxy.insert(statement, parameter);
}
public int update(String statement) {
return this.sqlSessionProxy.update(statement);
}
public int update(String statement, Object parameter) {
return this.sqlSessionProxy.update(statement, parameter);
}
public int delete(String statement) {
return this.sqlSessionProxy.delete(statement);
}
public int delete(String statement, Object parameter) {
return this.sqlSessionProxy.delete(statement, parameter);
}
public T getMapper(Class type) {
return getConfiguration().getMapper(type, this);
}
public void commit() {
throw new UnsupportedOperationException(“Manual commit is not allowed over a Spring managed SqlSession”);
}
public void commit(boolean force) {
throw new UnsupportedOperationException(“Manual commit is not allowed over a Spring managed SqlSession”);
}
public void rollback() {
throw new UnsupportedOperationException(“Manual rollback is not allowed over a Spring managed SqlSession”);
}
public void rollback(boolean force) {
throw new UnsupportedOperationException(“Manual rollback is not allowed over a Spring managed SqlSession”);
}
public void close() {
throw new UnsupportedOperationException(“Manual close is not allowed over a Spring managed SqlSession”);
}
public void clearCache() {
this.sqlSessionProxy.clearCache();
}
public Connection getConnection() {
return this.sqlSessionProxy.getConnection();
}
public List flushStatements() {
return this.sqlSessionProxy.flushStatements();
}
private class SqlSessionInterceptor implements InvocationHandler {
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
final SqlSession sqlSession = getSqlSession(
CustomSqlSessionTemplate.this.getSqlSessionFactory(),
CustomSqlSessionTemplate.this.executorType,
CustomSqlSessionTemplate.this.exceptionTranslator);
try {
Object result = method.invoke(sqlSession, args);
if (!isSqlSessionTransactional(sqlSession, CustomSqlSessionTemplate.this.getSqlSessionFactory())) {
sqlSession.commit(true);
}
return result;
} catch (Throwable t) {
Throwable unwrapped = unwrapThrowable(t);
if (CustomSqlSessionTemplate.this.exceptionTranslator != null && unwrapped instanceof PersistenceException) {
Throwable translated = CustomSqlSessionTemplate.this.exceptionTranslator
.translateExceptionIfPossible((PersistenceException) unwrapped);
if (translated != null) {
unwrapped = translated;
}
}
throw unwrapped;
} finally {
closeSqlSession(sqlSession, CustomSqlSessionTemplate.this.getSqlSessionFactory());
}
}
}
}
然后是xat分布式事务管理器,XATransactionManagerConfig.java:
import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.jta.UserTransactionManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.transaction.jta.JtaTransactionManager;
import javax.transaction.TransactionManager;
import javax.transaction.UserTransaction;
@Configuration
@EnableTransactionManagement
public class XATransactionManagerConfig {
@Bean
public UserTransaction userTransaction() throws Throwable {
UserTransactionImp userTransactionImp = new UserTransactionImp();
userTransactionImp.setTransactionTimeout(10000);
return userTransactionImp;
}
@Bean
public TransactionManager atomikosTransactionManager() {
UserTransactionManager userTransactionManager = new UserTransactionManager();
userTransactionManager.setForceShutdown(true);
return userTransactionManager;
}
@Bean
public PlatformTransactionManager transactionManager(UserTransaction userTransaction,
TransactionManager transactionManager) {
return new JtaTransactionManager(userTransaction, transactionManager);
}
}
然后,在启动类上,去除掉自动加载的数据源配置类,
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
public class JtadbsourceApplication {
public static void main(String[] args) {
SpringApplication.run(JtadbsourceApplication.class, args);
}
}
到这里,aop注解方式整合多数据源+分布式事务jta已经完毕了!
接下来就是使用测试环节,包括单数据源数据插入&事务回滚,多数据源切换插入&事务回滚:
首先创建实体类,User.java:
import lombok.Data;
import lombok.ToString;
@Data
@ToString
public class User {
private Integer id;
private String username;
private Integer age;
}
然后是UserMapper.java:
import com.test.jtadbsource.pojo.User;
import org.apache.ibatis.annotations.Mapper;
import org.springframework.stereotype.Repository;
@Mapper
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)