Springboot 整合druid+mybatis+jta分布式事务,kafka教程入门到进阶全套

Springboot 整合druid+mybatis+jta分布式事务,kafka教程入门到进阶全套,第1张

Springboot 整合druid+mybatis+jta分布式事务,kafka教程入门到进阶全套

**/

@documented

@Target({ElementType.METHOD})

@Retention(RetentionPolicy.RUNTIME)

public @interface DataSource {

String value() default DataSourceNames.ONE;

}

然后是创建 DataSourceNames.java,用于简单数据源命名:

public interface DataSourceNames {

String ONE = “ONE”;

String TWO = “TWO”;

}

ps:其实这些都是我之前aop切换数据源的时候敲的,大概8月份的时候,这次我相当于在这个基础上着重解决事务问题

然后是将自定义注解作为切点,进行aop方式动态切换逻辑补全,创建DynamicDataSourceAspect.java:

import com.test.jtadbsource.dbConfig.DataSourceContextHolder;

import org.aspectj.lang.JoinPoint;

import org.aspectj.lang.ProceedingJoinPoint;

import org.aspectj.lang.annotation.*;

import org.aspectj.lang.reflect.MethodSignature;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.stereotype.Component;

import java.lang.reflect.Method;

@Aspect

@Component

public class DynamicDataSourceAspect {

protected Logger logger = LoggerFactory.getLogger(getClass());

@Pointcut("@annotation(com.test.jtadbsource.dbAop.DataSource)")

public void dataSourcePointCut() {}

@Around(“dataSourcePointCut()”)

public Object around(ProceedingJoinPoint point) throws Throwable {

DataSource ds = null;

MethodSignature signature = (MethodSignature) point.getSignature();

Method method = signature.getMethod();

//获取自定义注解

ds = method.getAnnotation(DataSource.class);

if (ds == null) {

//如果监测到自定义注解不存在,那么默认切换到数据源 mydbone

DataSourceContextHolder.setDataSourceKey(DataSourceNames.ONE);

logger.info("set default datasource is " + DataSourceNames.ONE);

} else {

//自定义存在,则按照注解的值去切换数据源

DataSourceContextHolder.setDataSourceKey(ds.value());

logger.info("set datasource is " + ds.value());

}

return point.proceed();

}

@After(value = “dataSourcePointCut()”)

public void afterSwitchDS(JoinPoint point) {

DataSourceContextHolder.clearDataSourceKey();

logger.info(“clean datasource”);

}

}

上面用到的DataSourceContextHolder.java:

public class DataSourceContex

《一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》

【docs.qq.com/doc/DSmxTbFJ1cmN1R2dB】 完整内容开源分享

tHolder {

private static final ThreadLocal contextHolder = new ThreadLocal<>();

// 设置数据源名

public static void setDataSourceKey(String dbName) {

contextHolder.set(dbName);

}

// 获取数据源名

public static String getDataSourceKey() {

return (contextHolder.get());

}

// 清除数据源名

public static void clearDataSourceKey() {

contextHolder.remove();

}

}

ok,到这里,基本的动态切换边框的东西都完毕了,接下来是比较核心的:

1. DataSourceFactory.java  :

用于 不同的数据源DataSource的信息配置,使用DruidXADataSource创建,支持jta事务;

将不同数据源DataSource分别都关联上对应的AtomikosDataSourceBean,这样事务能提取到JTA事务管理器;

重写数据源会话工厂,为每个数据源单独配置一个。

配置重写的sqlSessionTemplate,将实际使用的不同数据源的sqlsession和spring的事务机制关联起来。

import com.alibaba.druid.pool.xa.DruidXADataSource;

import com.test.jtadbsource.dbAop.DataSourceNames;

import org.apache.ibatis.logging.stdout.StdOutImpl;

import org.apache.ibatis.session.SqlSessionFactory;

import org.mybatis.spring.SqlSessionFactoryBean;

import org.mybatis.spring.annotation.MapperScan;

import org.springframework.boot.context.properties.ConfigurationProperties;

import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.core.io.support.PathMatchingResourcePatternResolver;

import org.springframework.core.io.support.ResourcePatternResolver;

import javax.sql.DataSource;

import java.util.HashMap;

import java.util.Map;

@Configuration

@MapperScan(basePackages = DataSourceFactory.base_PACKAGES, sqlSessionTemplateRef = “sqlSessionTemplate”)

public class DataSourceFactory {

static final String base_PACKAGES = “com.test.jtadbsource.mapper”;

private static final String MAPPER_LOCATION = “classpath:mybatis/mapper

@Bean

@ConfigurationProperties(“spring.datasource.druid.mydbone”)

public DataSource druidDataSourceOne() {

return new DruidXADataSource();

}

@Bean

@ConfigurationProperties(“spring.datasource.druid.mydbtwo”)

public DataSource druidDataSourceTwo() {

return new DruidXADataSource();

}

@Bean

public DataSource dataSourceOne(DataSource druidDataSourceOne) {

AtomikosDataSourceBean sourceBean = new AtomikosDataSourceBean();

sourceBean.setXaDataSource((DruidXADataSource) druidDataSourceOne);

// 必须为数据源指定唯一标识

sourceBean.setPoolSize(5);

sourceBean.setTestQuery(“SELECT 1”);

sourceBean.setUniqueResourceName(“mydbone”);

return sourceBean;

}

@Bean

public DataSource dataSourceTwo(DataSource druidDataSourceTwo) {

AtomikosDataSourceBean sourceBean = new AtomikosDataSourceBean();

sourceBean.setXaDataSource((DruidXADataSource) druidDataSourceTwo);

sourceBean.setPoolSize(5);

sourceBean.setTestQuery(“SELECT 1”);

sourceBean.setUniqueResourceName(“mydbtwo”);

return sourceBean;

}

@Bean

public SqlSessionFactory sqlSessionFactoryOne(DataSource dataSourceOne)

throws Exception {

return createSqlSessionFactory(dataSourceOne);

}

@Bean

public SqlSessionFactory sqlSessionFactoryTwo(DataSource dataSourceTwo)

throws Exception {

return createSqlSessionFactory(dataSourceTwo);

}

@Bean

public CustomSqlSessionTemplate sqlSessionTemplate(SqlSessionFactory sqlSessionFactoryOne, SqlSessionFactory sqlSessionFactoryTwo) {

Map sqlSessionFactoryMap = new HashMap<>();

sqlSessionFactoryMap.put(DataSourceNames.ONE, sqlSessionFactoryOne);

sqlSessionFactoryMap.put(DataSourceNames.TWO, sqlSessionFactoryTwo);

CustomSqlSessionTemplate customSqlSessionTemplate = new CustomSqlSessionTemplate(sqlSessionFactoryOne);

customSqlSessionTemplate.setTargetSqlSessionFactories(sqlSessionFactoryMap);

return customSqlSessionTemplate;

}

private SqlSessionFactory createSqlSessionFactory(DataSource dataSource) throws Exception {

SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();

factoryBean.setDataSource(dataSource);

org.apache.ibatis.session.Configuration configuration = new org.apache.ibatis.session.Configuration();

//配置驼峰命名

configuration.setMapUnderscoreToCamelCase(true);

//配置sql日志

configuration.setLogImpl(StdOutImpl.class);

factoryBean.setConfiguration(configuration);

ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();

//配置读取mapper.xml路径

factoryBean.setMapperLocations(resolver.getResources(MAPPER_LOCATION));

return factoryBean.getObject();

}

}

上面用到的自定义CustomSqlSessionTemplate (重写SqlSessionTemplate):

import static java.lang.reflect.Proxy.newProxyInstance;

import static org.apache.ibatis.reflection.ExceptionUtil.unwrapThrowable;

import static org.mybatis.spring.SqlSessionUtils.closeSqlSession;

import static org.mybatis.spring.SqlSessionUtils.getSqlSession;

import static org.mybatis.spring.SqlSessionUtils.isSqlSessionTransactional;

import java.lang.reflect.InvocationHandler;

import java.lang.reflect.Method;

import java.sql.Connection;

import java.util.List;

import java.util.Map;

import org.apache.ibatis.exceptions.PersistenceException;

import org.apache.ibatis.executor.BatchResult;

import org.apache.ibatis.session.Configuration;

import org.apache.ibatis.session.ExecutorType;

import org.apache.ibatis.session.ResultHandler;

import org.apache.ibatis.session.RowBounds;

import org.apache.ibatis.session.SqlSession;

import org.apache.ibatis.session.SqlSessionFactory;

import org.mybatis.spring.MyBatisExceptionTranslator;

import org.mybatis.spring.SqlSessionTemplate;

import org.springframework.dao.support.PersistenceExceptionTranslator;

import org.springframework.util.Assert;

public class CustomSqlSessionTemplate extends SqlSessionTemplate {

private final SqlSessionFactory sqlSessionFactory;

private final ExecutorType executorType;

private final SqlSession sqlSessionProxy;

private final PersistenceExceptionTranslator exceptionTranslator;

private Map targetSqlSessionFactories;

private SqlSessionFactory defaultTargetSqlSessionFactory;

public void setTargetSqlSessionFactories(Map targetSqlSessionFactories) {

this.targetSqlSessionFactories = targetSqlSessionFactories;

}

public void setDefaultTargetSqlSessionFactory(SqlSessionFactory defaultTargetSqlSessionFactory) {

this.defaultTargetSqlSessionFactory = defaultTargetSqlSessionFactory;

}

public CustomSqlSessionTemplate(SqlSessionFactory sqlSessionFactory) {

this(sqlSessionFactory, sqlSessionFactory.getConfiguration().getDefaultExecutorType());

}

public CustomSqlSessionTemplate(SqlSessionFactory sqlSessionFactory, ExecutorType executorType) {

this(sqlSessionFactory, executorType, new MyBatisExceptionTranslator(sqlSessionFactory.getConfiguration()

.getEnvironment().getDataSource(), true));

}

public CustomSqlSessionTemplate(SqlSessionFactory sqlSessionFactory, ExecutorType executorType,

PersistenceExceptionTranslator exceptionTranslator) {

super(sqlSessionFactory, executorType, exceptionTranslator);

this.sqlSessionFactory = sqlSessionFactory;

this.executorType = executorType;

this.exceptionTranslator = exceptionTranslator;

this.sqlSessionProxy = (SqlSession) newProxyInstance(

SqlSessionFactory.class.getClassLoader(),

new Class[] { SqlSession.class },

new SqlSessionInterceptor());

this.defaultTargetSqlSessionFactory = sqlSessionFactory;

}

//通过DataSourceContextHolder获取当前的会话工厂

@Override

public SqlSessionFactory getSqlSessionFactory() {

String dataSourceKey = DataSourceContextHolder.getDataSourceKey();

SqlSessionFactory targetSqlSessionFactory = targetSqlSessionFactories.get(dataSourceKey);

if (targetSqlSessionFactory != null) {

return targetSqlSessionFactory;

} else if (defaultTargetSqlSessionFactory != null) {

return defaultTargetSqlSessionFactory;

} else {

Assert.notNull(targetSqlSessionFactories, “Property ‘targetSqlSessionFactories’ or ‘defaultTargetSqlSessionFactory’ are required”);

Assert.notNull(defaultTargetSqlSessionFactory, “Property ‘defaultTargetSqlSessionFactory’ or ‘targetSqlSessionFactories’ are required”);

}

return this.sqlSessionFactory;

}

@Override

public Configuration getConfiguration() {

return this.getSqlSessionFactory().getConfiguration();

}

public ExecutorType getExecutorType() {

return this.executorType;

}

public PersistenceExceptionTranslator getPersistenceExceptionTranslator() {

return this.exceptionTranslator;

}

public T selectOne(String statement) {

return this.sqlSessionProxy. selectOne(statement);

}

public T selectOne(String statement, Object parameter) {

return this.sqlSessionProxy. selectOne(statement, parameter);

}

public Map selectMap(String statement, String mapKey) {

return this.sqlSessionProxy. selectMap(statement, mapKey);

}

public Map selectMap(String statement, Object parameter, String mapKey) {

return this.sqlSessionProxy. selectMap(statement, parameter, mapKey);

}

public Map selectMap(String statement, Object parameter, String mapKey, RowBounds rowBounds) {

return this.sqlSessionProxy. selectMap(statement, parameter, mapKey, rowBounds);

}

public List selectList(String statement) {

return this.sqlSessionProxy. selectList(statement);

}

public List selectList(String statement, Object parameter) {

return this.sqlSessionProxy. selectList(statement, parameter);

}

public List selectList(String statement, Object parameter, RowBounds rowBounds) {

return this.sqlSessionProxy. selectList(statement, parameter, rowBounds);

}

public void select(String statement, ResultHandler handler) {

this.sqlSessionProxy.select(statement, handler);

}

public void select(String statement, Object parameter, ResultHandler handler) {

this.sqlSessionProxy.select(statement, parameter, handler);

}

public void select(String statement, Object parameter, RowBounds rowBounds, ResultHandler handler) {

this.sqlSessionProxy.select(statement, parameter, rowBounds, handler);

}

public int insert(String statement) {

return this.sqlSessionProxy.insert(statement);

}

public int insert(String statement, Object parameter) {

return this.sqlSessionProxy.insert(statement, parameter);

}

public int update(String statement) {

return this.sqlSessionProxy.update(statement);

}

public int update(String statement, Object parameter) {

return this.sqlSessionProxy.update(statement, parameter);

}

public int delete(String statement) {

return this.sqlSessionProxy.delete(statement);

}

public int delete(String statement, Object parameter) {

return this.sqlSessionProxy.delete(statement, parameter);

}

public T getMapper(Class type) {

return getConfiguration().getMapper(type, this);

}

public void commit() {

throw new UnsupportedOperationException(“Manual commit is not allowed over a Spring managed SqlSession”);

}

public void commit(boolean force) {

throw new UnsupportedOperationException(“Manual commit is not allowed over a Spring managed SqlSession”);

}

public void rollback() {

throw new UnsupportedOperationException(“Manual rollback is not allowed over a Spring managed SqlSession”);

}

public void rollback(boolean force) {

throw new UnsupportedOperationException(“Manual rollback is not allowed over a Spring managed SqlSession”);

}

public void close() {

throw new UnsupportedOperationException(“Manual close is not allowed over a Spring managed SqlSession”);

}

public void clearCache() {

this.sqlSessionProxy.clearCache();

}

public Connection getConnection() {

return this.sqlSessionProxy.getConnection();

}

public List flushStatements() {

return this.sqlSessionProxy.flushStatements();

}

private class SqlSessionInterceptor implements InvocationHandler {

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

final SqlSession sqlSession = getSqlSession(

CustomSqlSessionTemplate.this.getSqlSessionFactory(),

CustomSqlSessionTemplate.this.executorType,

CustomSqlSessionTemplate.this.exceptionTranslator);

try {

Object result = method.invoke(sqlSession, args);

if (!isSqlSessionTransactional(sqlSession, CustomSqlSessionTemplate.this.getSqlSessionFactory())) {

sqlSession.commit(true);

}

return result;

} catch (Throwable t) {

Throwable unwrapped = unwrapThrowable(t);

if (CustomSqlSessionTemplate.this.exceptionTranslator != null && unwrapped instanceof PersistenceException) {

Throwable translated = CustomSqlSessionTemplate.this.exceptionTranslator

.translateExceptionIfPossible((PersistenceException) unwrapped);

if (translated != null) {

unwrapped = translated;

}

}

throw unwrapped;

} finally {

closeSqlSession(sqlSession, CustomSqlSessionTemplate.this.getSqlSessionFactory());

}

}

}

}

然后是xat分布式事务管理器,XATransactionManagerConfig.java:

import com.atomikos.icatch.jta.UserTransactionImp;

import com.atomikos.icatch.jta.UserTransactionManager;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.transaction.PlatformTransactionManager;

import org.springframework.transaction.annotation.EnableTransactionManagement;

import org.springframework.transaction.jta.JtaTransactionManager;

import javax.transaction.TransactionManager;

import javax.transaction.UserTransaction;

@Configuration

@EnableTransactionManagement

public class XATransactionManagerConfig {

@Bean

public UserTransaction userTransaction() throws Throwable {

UserTransactionImp userTransactionImp = new UserTransactionImp();

userTransactionImp.setTransactionTimeout(10000);

return userTransactionImp;

}

@Bean

public TransactionManager atomikosTransactionManager() {

UserTransactionManager userTransactionManager = new UserTransactionManager();

userTransactionManager.setForceShutdown(true);

return userTransactionManager;

}

@Bean

public PlatformTransactionManager transactionManager(UserTransaction userTransaction,

TransactionManager transactionManager) {

return new JtaTransactionManager(userTransaction, transactionManager);

}

}

然后,在启动类上,去除掉自动加载的数据源配置类,

import org.springframework.boot.SpringApplication;

import org.springframework.boot.autoconfigure.SpringBootApplication;

import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;

@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})

public class JtadbsourceApplication {

public static void main(String[] args) {

SpringApplication.run(JtadbsourceApplication.class, args);

}

}

到这里,aop注解方式整合多数据源+分布式事务jta已经完毕了!

接下来就是使用测试环节,包括单数据源数据插入&事务回滚,多数据源切换插入&事务回滚:


首先创建实体类,User.java:

import lombok.Data;

import lombok.ToString;

@Data

@ToString

public class User {

private Integer id;

private String username;

private Integer age;

}

然后是UserMapper.java:

import com.test.jtadbsource.pojo.User;

import org.apache.ibatis.annotations.Mapper;

import org.springframework.stereotype.Repository;

@Mapper

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5679284.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存