什么是@Transactional? @Transactional是Spring这种用于处理事务的注解,基于拦截器进行commit或rollback
使用例子下面举一个加了@Transactional注解的方法addUser(),并且调用了另一个隔离级别为NESTED的addUser2()方法
@Service public class UserService { // .. 省略其他 @Transactional public void addUser() { userMapper.addItem("nA", "cA"); // 调用的addUser2()方法是嵌套模式 ((UserService) applicationContext.getBean(UserService.class)).addUser2(); userMapper.addItem("nB", "cB"); } @Transactional(propagation = Propagation.NESTED) public void addUser2() { userMapper.addItem("nC", "cC"); } }执行流程时序图
实际上上面的例子是两个事务。是怎么实现的,见步骤如下
上述例子调用执行步骤-
拦截器intercept拦截,由于service类没继承接口,进入CglibAopProxy的intercept拦截器中
-
调用getInterceptorsAndDynamicInterceptionAdvice()方法,然后工厂生成拦截chain集合(拦截器列表),
然后将拦截器、target等封装成一个对象CglibMethodInvocation,然后调用proceed执行事务逻辑 -
走到这步,开始执行事务逻辑。通过TransactionInterceptor类执行invokeWithinTransaction(), 然后走到TransactionAspectSupport类(重点类!!含事务处理逻辑)
-
走到TransactionAspectSupport类,其具体逻辑如下(重点)
- 4.1 getTransaction()获取事务管理器,然后doGetTransaction()获取事务连接上下文。保存线程上下文在ThreadLocal
- 4.2 判断是否存在事务,存在则:
以下是各个隔离级别的处理方式 - PROPAGATION_NEVER:抛异常 - PROPAGATION_NOT_SUPPORTED:不加事务 - PROPAGATION_REQUIRES_NEW:创建新事务 - PROPAGATION_NESTED:创建保存点 - PROPAGATION_SUPPORTS 或 PROPAGATION_REQUIRED:合并已有的事务
- 4.3 不存在事务走这个逻辑:
以下是各个隔离级别的处理方式 - PROPAGATION_MANDATORY:抛异常 - PROPAGATION_REQUIRED 或 PROPAGATION_REQUIRES_NEW 或 PROPAGATION_NESTED:创建一个新事务。会将mysql自动提交关闭,然后标记当前事务开启。 - 其他传播级别:创建一个空事务
- 4.4 prepareTransactionInfo()把事务信息TransactionInfo绑定到线程上下文
- 然后执行invocation.proceedWithInvocation()执行到切面业务代码逻辑,会执行到invokeJoinpoint()真正调用业务代码逻辑 ,就是上面使用例子的逻辑
@Service public class UserService { // .. 省略其他 @Transactional public void addUser() { userMapper.addItem("nA", "cA"); // 调用的addUser2()方法是嵌套模式 ((UserService) applicationContext.getBean(UserService.class)).addUser2(); userMapper.addItem("nB", "cB"); } @Transactional(propagation = Propagation.NESTED) public void addUser2() { userMapper.addItem("nC", "cC"); } }
- 执行完之后回到TransactionAspectSupport,执行commit或rollback逻辑
步骤如下: [正常执行,提交事务]- 如果有保存点,释放保存点 - 新事务的话,获取数据库连接并提交事务con.commit() [正常执行,回滚事务] - 如果有保存点,回滚保存点 - 新事务的话,回滚事务con.rollback()源码流程
class CglibAopProxy implements AopProxy, Serializable { // 省略其他逻辑。。。。。 @Override public Object intercept(Object proxy, Method method, Object[] args, MethodProxy methodProxy) throws Throwable { // 调用Gglib动态代理 省略其他逻辑。。。。。 retVal = new CglibMethodInvocation(proxy, target, method, args, targetClass, chain, methodProxy).proceed(); // 省略其他逻辑。。。。。 } } public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean { // 省略其他逻辑。。。。。 // 创建事务(重要!!!!重要!!!!) TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification); // 创建事务的核心方法==================================== protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm, @Nullable TransactionAttribute txAttr, final String joinpointIdentification) { // 省略其他逻辑。。。 TransactionStatus status = null; if (txAttr != null) { if (tm != null) { status = tm.getTransaction(txAttr); } } // prepareTransactionInfo()把事务信息TransactionInfo绑定到线程上下文 省略其他逻辑。。。 return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status); } // 获取事务 public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) { // 在spring中,doXxx都是干正事的,所以这里才是调用事务的方法 Object transaction = doGetTransaction(); // 处理事务已经存在的情况,比如例子中调用addUser2就会调到这里来 if (isExistingTransaction(transaction)) { return handleExistingTransaction(def, transaction, debugEnabled); } // 事务超时,默认没超时限制 if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) { throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout()); } // PROPAGATION_MANDATORY 不允许有事务,有的话直接抛异常 if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) { throw new IllegalTransactionStateException("No existing transaction found for transaction marked with propagation 'mandatory'"); } else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { // 走到这步,说明 可以创建新事务的,共三种:PROPAGATION_REQUIRED、PROPAGATION_REQUIRES_NEW、PROPAGATION_NESTED SuspendedResourcesHolder suspendedResources = suspend(null); // 省略其他代码。。。。 return startTransaction(def, transaction, debugEnabled, suspendedResources); } else { // 创建空事务 Create "empty" transaction: no actual transaction, but potentially synchronization. boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null); } } // 启动一个新事务(通过上面方法的 startTransaction() 可调用到) protected void doBegin(Object transaction, TransactionDefinition definition) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;// 获取 事务数据源 Connection con = null; try { if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) { Connection newCon = obtainDataSource().getConnection(); if (logger.isDebugEnabled()) { logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction"); } // 没有数据库连接connection,就创建一个新的放进去 txObject.setConnectionHolder(new ConnectionHolder(newCon), true); } // 设置事务同步,并且获取 数据库connection,并设置其他参数 txObject.getConnectionHolder().setSynchronizedWithTransaction(true); con = txObject.getConnectionHolder().getConnection(); Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition); txObject.setPreviousIsolationLevel(previousIsolationLevel); txObject.setReadOnly(definition.isReadOnly()); // 如果当前mysql开启了自动提交,关闭mysql自动提交,并且给当前事务对象autoCommit提交设置为true(就是改为当前spring自己控制) if (con.getAutoCommit()) { txObject.setMustRestoreAutoCommit(true); con.setAutoCommit(false); } prepareTransactionalConnection(con, definition); txObject.getConnectionHolder().setTransactionActive(true);// 设置事务激活 int timeout = determineTimeout(definition); if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) { txObject.getConnectionHolder().setTimeoutInSeconds(timeout); } // Bind the connection holder to the thread. if (txObject.isNewConnectionHolder()) { TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder()); } } // 省略其他代码。。。 } // 上面getTransaction()方法如果判断到已存在事务,会走这个逻辑 private TransactionStatus handleExistingTransaction(TransactionDefinition definition, Object transaction, boolean debugEnabled) { // TransactionDefinition.PROPAGATION_NEVER:不能存在事务,抛异常 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) { throw new IllegalTransactionStateException("Existing transaction found for transaction marked with propagation 'never'"); } // 不支持事务,所以会挂起当前事务 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) { if (debugEnabled) { logger.debug("Suspending current transaction"); } // 事务挂起的作用是什么? // - 方法A调用方法B,方法A打开的 Transaction将挂起,方法B中任何数据库操作,都不在该Transaction的管理之下 // - 是怎么操作挂起的?其实就是把他从threadLocal中移除除去(ThreadLocal总结
这个事务注解很简单。其实就是通过拦截器拦截注解,然后加载注解参数,根据隔离级别来判断是否需要创建事务,
如果有多个事务隔离级别,就会涉及到事务保存点, 或者是不同事务之间还会涉及掉事务挂起
- 事务保存点:依托mysql数据库机制,用于回滚部分逻辑。比如事务有三个时间点(A->B1->B2->C),其中B1->B2是事务保存点,
如果这时候B失败,并没有抛异常到上一个方法,这时候只会回滚B1->B2执行的sql逻辑
– 嵌套事务NESTED就是依托于事务保存点来处理,这就是为什么出异常子方法回滚了,而调用它的方法事务不会回滚(正常执行的时候子方法执行完会移除事务保存点)
-- 用法如下: BEGIN TRANSACTION A; SELECT 2; INSERT INTO TABLE1 (xx) VALUES ("xxx"); SAVE TRANSACTION B Point; // B1 启动事务保存点 INSERT INTO TABLE (xx) VALUES ("xxx"); ROLLBACK TRANSACTION B Point; // B2 回滚事务保存点的数据 SELECT 1; INSERT INTO TABLE2 (xx) VALUES ("xxx"); COMMIT TRANSACTION A;
- 事务挂起:事务挂起很简单,就是移除treadLocal中数据暂存到其他地方,避免自动提交
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)