我们在使用Spring事务机制时,需要在配置类创建一个事务管理器的实例,用于处理事务提交、回滚、关闭等 *** 作
定义一个DataSourceTransactionManager实例的事务管理器
@Bean public PlatformTransactionManager transactionManager() { DataSourceTransactionManager transactionManager = new DataSourceTransactionManager(); transactionManager.setDataSource(dataSource()); return transactionManager; } @Bean public DataSource dataSource() { DriverManagerDataSource dataSource = new DriverManagerDataSource(); dataSource.setUrl("jdbc:mysql://127.0.0.1:3306/test?characterEncoding=utf-8&useSSL=false"); dataSource.setUsername("root"); dataSource.setPassword("ROOT"); return dataSource; }
在详细介绍Spring事务执行的源码之前,先大概介绍一个事务执行的大概流程,其中重点介绍一下Spring事务的传播机制
1.1 基本流程在上面生成Advise实例中,生成的TransactionInterceptor对象其实就是一个methodInterceptor对象,在执行事务方法时,会调用TransactionInterceptor对象的invoke()方法,基本执行流程为:
- 利用所配置的PlatformTransactionManager事务管理器新建一个数据库连接
- 修改数据库连接的autocommit为false,这样才能由Spring来控制提交或回滚逻辑
- 执行MethodInvocation.proceed()方法,简单理解就是去执行事务方法,其中就会执行SQL
- 如果没有异常,则提交
- 如果抛出异常,就回滚
然而在真正的开发工作中,肯定还会出现方法之间的调用,以a()方法和b()方法为例,a()方法中调用b()方法,那么a()方法中的事务是否也可以用在b()方法中呢?
针对这个问题,Spring提供了事务传播机制,用来解决方法调用时,事务的作用域问题
下面以一个简单的例子介绍一个Spring事务传播机制的实现过程,假设a()方法和b()方法都使用了@Transactional注解,a()方法在一个事务中执行,调用b()方法时,需要重新开一个事务执行:
- 代理对象执行a()方法之前,需要先利用事务管理器新建一个数据库连接conn
- 将数据库连接conn的autocommit改为false
- 把数据库连接conn设置到ThreadLocal中,(这样可以保证同一个线程中,用的都是用一个数据库连接)
- 执行a()方法中的SQL
- 执行a()方法过程中,调用b()方法(注意用代理对象调用,否则事务不生效)
- 代理对象执行b()方法前,首先判断当前线程已经存在一个数据库连接conn,表示当前线程已经拥有了一个Spring事务,则将该事务挂起
- 挂起就是把ThreadLoacl中的数据库连接conn从ThreadLocal中移除,并放入到一个挂起资源对象
- 挂起完成后,再利用事务管理器新建一个数据库连接reconn
- 将数据库连接reconn的autocommit设置为false
- 把数据库连接reconn设置到ThreadLocal中
- 执行b()方法中的SQL
- b()方法执行完成之后,则从ThreadLocal中拿到数据库连接reconn进行提交
- 提交完成之后恢复所挂起的数据库连接conn,这里的恢复就是把在挂起资源对象中所保存的数据库连接conn再次设置到ThreadLocal中
- a()方法正常执行完,则从ThreadLocal中拿到数据库连接conn进行提交
注:在执行某方方法时,判断当前是否存在一个事务,就是判断当前线程的ThreadLocal中是否存在一个数据库连接对象,如果存在则表示以及存在一个事务了
如果以非事务运行时,表示在执行方法时,Spring事务管理器不会去建立数据库连接,执行SQL时,由Mybatis或JdbcTemplate建立数据库连接来执行SQL
Spring提供了7种传播机制:
- REQUIRED:没有事务则新建,有则直接在该事务上执行
- SUPPORTS:如果有事务,直接在该事务上运行,如果没有事务,就以非事务方式运行
- MANDATORY:如果没有事务,直接抛异常;如果有事务,直接在当前事务运行
- REQUIRES_NEW:创建一个新的事务,并挂起当前的事务
- NOT_SUPPORTED:以非事务方式运行,如果存在事务,则把当前事务挂起
- NEVER:以非事务方式运行,如果存在事务,则抛出异常
- NESTED:同MySQL的savepoint,如果存在事务,则在事务内部运行(可以单独回滚),如果没有事务则新建事务
在前面已经说过,在执行事务方法时,会调用TransactionInterceptor的invoke()方法,这里使用匿名内部类创建了一个CoroutinesInvocationCallback对象,而该对象是在开始事务之后,才会去调用里面的proceedWithInvocation()方法,而在invokeWithinTransaction()方法里面,会去创建事务等
public Object invoke(MethodInvocation invocation) throws Throwable { // Work out the target class: may be {@code null}. // The TransactionAttributeSource should be passed the target class // as well as the method, which may be from an interface. Class> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null); // Adapt to TransactionAspectSupport's invokeWithinTransaction... return invokeWithinTransaction(invocation.getMethod(), targetClass, new CoroutinesInvocationCallback() { @Override @Nullable public Object proceedWithInvocation() throws Throwable { return invocation.proceed(); } @Override public Object getTarget() { return invocation.getThis(); } @Override public Object[] getArguments() { return invocation.getArguments(); } }); }2.1 获取事务管理器
在《Spring 事务源码分析(上)》的生成Advise实例时,设置了TransactionAttributeSource实例,这个实例更像是一个工具类,在各种解析的时候都需要用到它
在invokeWithinTransaction()方法内部,首先就是获取TransactionAttributeSource实例,然后调用它的getTransactionAttribute()方法,将method方法上@Transactional注解的配置生成一个TransactionAttribute对象,这个对象包含了事务的所有配置
接着就是调用determineTransactionManager()去获取一个事务管理器实例
protected Object invokeWithinTransaction(Method method, @Nullable Class> targetClass, final InvocationCallback invocation) throws Throwable { // If the transaction attribute is null, the method is non-transactional. // TransactionAttribute就是@Transactional中的配置 TransactionAttributeSource tas = getTransactionAttributeSource(); final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null); final TransactionManager tm = determineTransactionManager(txAttr); …… }
在文章的开始位置,定义了一个DataSourceTransactionManager事务管理器对象,在@Transactional注解的value()和transactionManager()配置可以指定事务管理器的名称,在获取事务管理器时,首先根据配置来获取;如果没有配置,则直接调用getBean()方法,通过指定TransactionManager类型,获取事务管理器的实例,最后进行缓存
protected TransactionManager determineTransactionManager(@Nullable TransactionAttribute txAttr) { …… // @Transactional注解指定的事务管理器名称 String qualifier = txAttr.getQualifier(); if (StringUtils.hasText(qualifier)) { return determineQualifiedTransactionManager(this.beanFactory, qualifier); } else if (StringUtils.hasText(this.transactionManagerBeanName)) { return determineQualifiedTransactionManager(this.beanFactory, this.transactionManagerBeanName); } else { TransactionManager defaultTransactionManager = getTransactionManager(); if (defaultTransactionManager == null) { defaultTransactionManager = this.transactionManagerCache.get(DEFAULT_TRANSACTION_MANAGER_KEY); if (defaultTransactionManager == null) { // 直接从容器获取TransactionManager 事务管理器实例 defaultTransactionManager = this.beanFactory.getBean(TransactionManager.class); this.transactionManagerCache.putIfAbsent( DEFAULT_TRANSACTION_MANAGER_KEY, defaultTransactionManager); } } return defaultTransactionManager; } }2.2 创建事务
获取事务管理器之后,就可以创建事务了,但在创建事务之前,需要去生成一个JoinPoint,通常把方法名作为连接点
在这里创建事务时,返回可能是现有的事务,也可能是一个新的事务,这与方法上定义的事务传播机制有关
所以这里把TransactionInfo理解成是一个逻辑事务,事务创建完成之后,就会去回调proceedWithInvocation()方法,这个方法可能是去执行下一个MethodInterceptor,也可能就是直接调用事务方法
执行过程中过程中出现异常,会调用completeTransactionAfterThrowing()方法回滚,最后从ThreadLocal中清除当前事务,最后就是调用commitTransactionAfterReturning()方法去提交事务
PlatformTransactionManager ptm = asPlatformTransactionManager(tm); // joinpoint的唯一标识 final String joinpointIdentification = methodIdentification(method, targetClass, txAttr); if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) { // Standard transaction demarcation with getTransaction and commit/rollback calls. // 如果有必要就创建事务,这里就涉及到事务传播机制的实现了 // TransactionInfo表示一个逻辑事务,比如两个逻辑事务属于同一个物理事务 TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification); Object retVal; try { // This is an around advice: Invoke the next interceptor in the chain. // This will normally result in a target object being invoked. // 执行下一个Interceptor或被代理对象中的方法 retVal = invocation.proceedWithInvocation(); } catch (Throwable ex) { // target invocation exception completeTransactionAfterThrowing(txInfo, ex); throw ex; } finally { cleanupTransactionInfo(txInfo); } …… // 提交事务 commitTransactionAfterReturning(txInfo); return retVal; }
下面重点介绍createTransactionIfNecessary()方法是如何创建事务的
2.2.1 创建TransactionStatus如果有@Transactional注解,并且也有事务管理器,就为当前事务方法创建一个TransactionStatus对象,TransactionStatus中的属性记录了物理事务是不是最新的,如果不是最新的,说明当前方法使用的是同一个事务
然后调用prepareTransactionInfo生成也给TransactionInfo对象,并把TransactionStatus对象设置进去,最后把TransactionInfo对象放入到当前线程ThreadLocal对象transactionInfoHolder中
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm, @Nullable TransactionAttribute txAttr, final String joinpointIdentification) { // 每个逻辑事务都会创建一个TransactionStatus,但是TransactionStatus中有一个属性代表当前逻辑事务底层的物理事务是不是新的 TransactionStatus status = null; if (txAttr != null) { if (tm != null) { status = tm.getTransaction(txAttr); } } return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status); }2.2.2 先尝试获取事务(可能为空)
通过getTransaction()方法来获取事务,首先会尝试去当前线程的ThreadLocal中去拿,前面的事务管理器配置的是DataSourceTransactionManager,所以下面看DataSourceTransactionManager中doGetTransaction()方法的实现
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException { // Use defaults if no transaction definition given. TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults()); Object transaction = doGetTransaction(); boolean debugEnabled = logger.isDebugEnabled(); …… }
获取把事务管理器设置的dataSource对象作为Key,调用getResource()来获取数据库连接,然后将得到的数据库连接设置到新生成的DataSourceTransactionObject对象中,通过设置这个连接的状态为false,表示不是最新的物理连接
这时获取的数据库连接可能为空
protected Object doGetTransaction() { DataSourceTransactionObject txObject = new DataSourceTransactionObject(); txObject.setSavepointAllowed(isNestedTransactionAllowed()); ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource()); txObject.setConnectionHolder(conHolder, false); return txObject; }
getResource()方法会去调用doGetResource()方法,在该方法中,resources对象就是一个ThreadLocal,存放的是当前线程的数据库连接,所以在doGetResource()方法中,直接根据key就可以获取数据库连接
private static final ThreadLocal2.2.3 当前没有事务,则新建事务
上面通过doGetTransaction()方法去获取当前线程的事务,返回的一个DataSourceTransactionObject对象,这个对象里面记录数据库连接
接下来,就是要判断是否有数据库连接
Object transaction = doGetTransaction(); // transaction.getConnectionHolder().isTransactionActive() if (isExistingTransaction(transaction)) { // Existing transaction found -> check propagation behavior to find out how to behave. return handleExistingTransaction(def, transaction, debugEnabled); }
isExistingTransaction()方法判断是否有数据库连接也很简单,就是判断前面返回的DataSourceTransactionObject中,它的connectionHolder属性是否为null即可
protected boolean isExistingTransaction(Object transaction) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive()); }
如果当前线程并没有数据库连接,则去创建一个数据库连接。但在开启一个事务之前,要检查一些事务的配置,比如超时时间不能小于-1,如果当前方法定义的事务传播机制为MANDATORY,直接抛异常
在没有事务时,只有REQUIRED、REQUIRES_NEW和NESTED这三种事务传播机制才会去创建新的事务
if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) { throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout()); } // No existing transaction found -> check propagation behavior to find out how to proceed. if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) { throw new IllegalTransactionStateException( "No existing transaction found for transaction marked with propagation 'mandatory'"); } else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { // 没有事务需要挂起,不过TransactionSynchronization有可能需要挂起 // suspendedResources表示当前线程被挂起的资源持有对象(数据库连接、TransactionSynchronization) SuspendedResourcesHolder suspendedResources = suspend(null); if (debugEnabled) { logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def); } try { // 开启事务后,transaction中就会有数据库连接了,并且isTransactionActive为true // 并返回TransactionStatus对象,该对象保存了很多信息,包括被挂起的资源 return startTransaction(def, transaction, debugEnabled, suspendedResources); } catch (RuntimeException | Error ex) { resume(null, suspendedResources); throw ex; } }
但在创建事务之前,还有一个重要的动作,就是挂起当前事务,即把当前事务信息封装成一个SuspendedResourcesHolder
下面看下suspend()方法在做什么,首先会去判断当前线程是否有TransactionSynchronization,如果有,则需要把TransactionSynchronization相关的配置全部拿出来,然后将事务的活动状态设置为false,表示当前事务不可用,然后生成一个SuspendedResourcesHolder对象暂存当前事务的信息
protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException { // synchronizations是一个ThreadLocal> // 我们可以在任何地方通过TransactionSynchronizationManager给当前线程添加TransactionSynchronization, // 如果当前线程存在TransactionSynchronization if (TransactionSynchronizationManager.isSynchronizationActive()) { // 调用TransactionSynchronization的suspend方法,并清空和返回当前线程中所有的TransactionSynchronization对象 List suspendedSynchronizations = doSuspendSynchronization(); try { Object suspendedResources = null; if (transaction != null) { // 挂起事务,把transaction中的Connection清空,并把resources中的key-value进行移除,并返回数据库连接Connection对象 suspendedResources = doSuspend(transaction); } // 获取并清空当前线程中关于TransactionSynchronizationManager的设置 String name = TransactionSynchronizationManager.getCurrentTransactionName(); TransactionSynchronizationManager.setCurrentTransactionName(null); boolean readonly = TransactionSynchronizationManager.isCurrentTransactionReadOnly(); TransactionSynchronizationManager.setCurrentTransactionReadOnly(false); Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel(); TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null); boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive(); TransactionSynchronizationManager.setActualTransactionActive(false); // 将当前线程中的数据库连接对象、TransactionSynchronization对象、TransactionSynchronizationManager中的设置构造成一个对象 // 表示被挂起的资源持有对象,持有了当前线程中的事务对象、TransactionSynchronization对象 return new SuspendedResourcesHolder( suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive); } catch (RuntimeException | Error ex) { // doSuspend failed - original transaction is still active... doResumeSynchronization(suspendedSynchronizations); throw ex; } } else if (transaction != null) { // Transaction active but no synchronization active. Object suspendedResources = doSuspend(transaction); return new SuspendedResourcesHolder(suspendedResources); } else { // Neither transaction nor synchronization active. return null; } }
如果当前事务不为空,会调用doSuspend()方法把当前事务挂起,也就是从ThreadLocal中把数据库连接移除
protected Object doSuspend(Object transaction) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; txObject.setConnectionHolder(null); return TransactionSynchronizationManager.unbindResource(obtainDataSource()); } // resources是存放数据库连接的ThreadLocal private static Object doUnbindResource(Object actualKey) { Map
TransactionSynchronization是一个什么东西,我们可以通过TransactionSynchronizationManager在任何事务方法中来添加一些TransactionSynchronization,TransactionSynchronization中的这些方法,会在事务的不同阶段执行
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { @Override public int getOrder() { return TransactionSynchronization.super.getOrder(); } // 事务挂起时执行 @Override public void suspend() { TransactionSynchronization.super.suspend(); } @Override //事务恢复时执行 public void resume() { TransactionSynchronization.super.resume(); } @Override public void flush() { TransactionSynchronization.super.flush(); } @Override // 提交前执行 public void beforeCommit(boolean readOnly) { TransactionSynchronization.super.beforeCommit(readOnly); } @Override // 回滚前执行 public void beforeCompletion() { TransactionSynchronization.super.beforeCompletion(); } @Override public void afterCommit() { TransactionSynchronization.super.afterCommit(); } @Override public void afterCompletion(int status) { TransactionSynchronization.super.afterCompletion(status); } });
接下来,才是真正地去开启一个事务,调用doBegin()获得一个新的连接,该方法的第一个参数是DataSourceTransactionObject对象,就是前面保存数据库连接的对象
开启一个事务之后,通过prepareSynchronization()方法来初始化TransactionSynchronization
private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction, boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) { boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); // 开启的这个事务的状态信息: // 事务的定义、用来保存数据库连接的对象、是否是新事务,是否是新的TransactionSynchronization DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); // 开始事务 doBegin(transaction, definition); prepareSynchronization(status, definition); return status; }
在doBegin()方法中,会通过DataSource获取一个数据库连接,然后设置DataSourceTransactionObject对象的一些参数,包括数据库连接,是否是新的连接,隔离等级,只读等设置
同时要把数据库连接的AutoCommit设置为false,由Spring来控制事务的提交于回滚
通过prepareTransactionalConnection()方法来设置当前事务为只读事务
最后调用bindResource()方法,将数据库连接添加到当前线程ThreadLoacl的resource中,至此,便开始了一个新的数据库事务
protected void doBegin(Object transaction, TransactionDefinition definition) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; Connection con = null; // 如果当前线程中所使用的DataSource还没有创建过数据库连接,就获取一个新的数据库连接 if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) { Connection newCon = obtainDataSource().getConnection(); txObject.setConnectionHolder(new ConnectionHolder(newCon), true); } txObject.getConnectionHolder().setSynchronizedWithTransaction(true); con = txObject.getConnectionHolder().getConnection(); Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition); txObject.setPreviousIsolationLevel(previousIsolationLevel); txObject.setReadOnly(definition.isReadOnly()); // Switch to manual commit if necessary. This is very expensive in some JDBC drivers, // so we don't want to do it unnecessarily (for example if we've explicitly // configured the connection pool to set it already). // 设置autocommit为false if (con.getAutoCommit()) { txObject.setMustRestoreAutoCommit(true); con.setAutoCommit(false); } prepareTransactionalConnection(con, definition); txObject.getConnectionHolder().setTransactionActive(true); // Bind the connection holder to the thread. if (txObject.isNewConnectionHolder()) { TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder()); } }
如果当前事务配置为只读事务,则执行SQL方法,将事务设置为只读
protected void prepareTransactionalConnection(Connection con, TransactionDefinition definition) throws SQLException { if (isEnforceReadOnly() && definition.isReadOnly()) { try (Statement stmt = con.createStatement()) { stmt.executeUpdate("SET TRANSACTION READ ONLY"); } } }2.2.4 当前有事务,根据事务传播机制判断是否需要新建事务
上面介绍了在没有事务时的情况,如果当前线程已经存在事务了,又会怎么处理呢
如果当前ThreadLocal中存在数据库连接,则调用handleExistingTransaction()方法来处理已存在的事务
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException { Object transaction = doGetTransaction(); boolean debugEnabled = logger.isDebugEnabled(); // transaction.getConnectionHolder().isTransactionActive() if (isExistingTransaction(transaction)) { // Existing transaction found -> check propagation behavior to find out how to behave. return handleExistingTransaction(def, transaction, debugEnabled); } …… }
在handleExistingTransaction()方法中,就按照事务传播机制来决定是否新建事务或无事务执行或抛异常
PROPAGATION_NEVER:已经有事务存在,直接抛异常
PROPAGATION_NOT_SUPPORTED:挂起当前事务,以无事务方式运行
PROPAGATION_REQUIRES_NEW:挂起当前事务,创建新的事务
PROPAGATION_NESTED:如果以savePoint来解决这种事务,就在当前事务创建SavePoint,回滚时可以局部回滚;如果不使用SavePoint,就新建一个事务。但PlatformTransactionManager类型的事务管理理器的useSavepointForNestedTransaction()方法固定返回true
如果没有创建新的事务,最后调用prepareTransactionStatus()方法,为当前事务构建一个新的TransactionStatus对象
private TransactionStatus handleExistingTransaction( TransactionDefinition definition, Object transaction, boolean debugEnabled) throws TransactionException { if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) { throw new IllegalTransactionStateException( "Existing transaction found for transaction marked with propagation 'never'"); } if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) { Object suspendedResources = suspend(transaction); boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); return prepareTransactionStatus( definition, null, false, newSynchronization, debugEnabled, suspendedResources); } if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) { SuspendedResourcesHolder suspendedResources = suspend(transaction); try { return startTransaction(definition, transaction, debugEnabled, suspendedResources); } catch (RuntimeException | Error beginEx) { resumeAfterBeginException(transaction, suspendedResources, beginEx); throw beginEx; } } if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { if (!isNestedTransactionAllowed()) { throw new NestedTransactionNotSupportedException( "Transaction manager does not allow nested transactions by default - " + "specify 'nestedTransactionAllowed' property with value 'true'"); } if (useSavepointForNestedTransaction()) { DefaultTransactionStatus status = prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null); status.createAndHoldSavepoint(); return status; } else { return startTransaction(definition, transaction, debugEnabled, null); } } boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null); }
protected boolean useSavepointForNestedTransaction() { return true; }三、提交/回滚事务 3.1 恢复事务
在创建新的事务之前,都会将现在的事务挂起,如果在创建新事务的过程中出现了异常,还需要把这些挂起的事务恢复
或者当前事务已经提交或回滚了,也需要去恢复已经挂起的事务
所有恢复事务的方法都会去调用resume()方法,在该方法中,会去调用doResume()方法,而doResume()方法中,就是直接把之前已经挂起的事务,重新放入到当前线程ThreadLocal的resource中
protected final void resume(@Nullable Object transaction, @Nullable SuspendedResourcesHolder resourcesHolder) throws TransactionException { if (resourcesHolder != null) { Object suspendedResources = resourcesHolder.suspendedResources; if (suspendedResources != null) { doResume(transaction, suspendedResources); } List3.2 回滚事务suspendedSynchronizations = resourcesHolder.suspendedSynchronizations; if (suspendedSynchronizations != null) { TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive); TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel); TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly); TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name); doResumeSynchronization(suspendedSynchronizations); } } } protected void doResume(@Nullable Object transaction, Object suspendedResources) { TransactionSynchronizationManager.bindResource(obtainDataSource(), suspendedResources); }
现在我们回到最开始的invokeWithinTransaction()方法,在执行proceedWithInvocation()方法时,如果出现异常,会调用completeTransactionAfterThrowing()方法进行回滚
if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) { TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification); Object retVal; try { retVal = invocation.proceedWithInvocation(); } catch (Throwable ex) { // target invocation exception completeTransactionAfterThrowing(txInfo, ex); throw ex; } finally { cleanupTransactionInfo(txInfo); } // 提交事务 commitTransactionAfterReturning(txInfo); }
在completeTransactionAfterThrowing()方法中,首先调用rollback()方法判断抛出的异常与@Transactional中回滚规则中定义的异常是否匹配,如果匹配则直接回滚
如果不匹配,说明手动强制回滚,调用下面的commit()方法
protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) { if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) { txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus()); }else { // We don't roll back on this exception. // Will still roll back if TransactionStatus.isRollbackonly() is true. txInfo.getTransactionManager().commit(txInfo.getTransactionStatus()); } }3.2.1 强制回滚
在commit()方法中,首先会判断当前事务的rollbackOnly属性是否为true
public final void commit(TransactionStatus status) throws TransactionException { DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status; if (defStatus.isLocalRollbackOnly()) { processRollback(defStatus, false); return; } …… }
rollbackOnly的用途用下:
test()方法和testB()方法中的SQL可以正常执行,但调用完testB()方法后,抛出了异常,同时为了能让返回的异常清晰明细,进行捕获,然后构建一个简单的异常信息,因为异常被捕获了,导致事务就可以正常提交。为了能让前面的SQL回滚,我们可以通过setRollbackonly()方法设置当前事务的rollbackOnly为true,这样在提交时,会强制回滚
@Transactional(rollbackFor = Exception.class) public Maptest(){ jdbcTemplate.execute("insert into user values (1,'lizhi',24)"); userService.testB(); try { throw new NullPointerException(); } catch (Exception e) { TransactionAspectSupport.currentTransactionStatus().setRollbackOnly(); return new HashMap (); } } @Transactional(rollbackFor = NullPointerException.class) public void testB(){ jdbcTemplate.execute("insert into user values (2,'lizhi',24)"); }
在processRollback()方法中,调用triggerBeforeCompletion()方法,去执行所有TransactionSynchronization的beforeCompletion()方法
如果当前事务有安全点,则直接回滚到安全点
如果当前事务是一个新的事务,则直接回滚
如果当前事务设置了rollbackOnly属性,或者globalRollbackOnParticipationFailure属性为true,就把当前事务的rollbackOnly属性设置为true,当事务提交的时候,会判断,然后进行回滚
globalRollbackOnParticipationFailure属性表示部分异常,全局回滚,默认为true,比如a()方法调用b()方法,如果b()方法出现了异常,a()方法中的SQL也会被回滚
然后调用triggerAfterCompletion()执行所有TransactionSynchronization的afterCompletion()方法
最后调用cleanupAfterCompletion()方法去清除当前事务的信息,以及恢复挂起的事务
private void processRollback(DefaultTransactionStatus status, boolean unexpected) { try { boolean unexpectedRollback = unexpected; try { triggerBeforeCompletion(status); if (status.hasSavepoint()) { status.rollbackToHeldSavepoint(); } else if (status.isNewTransaction()) { doRollback(status); } else { // Participating in larger transaction if (status.hasTransaction()) { if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) { doSetRollbackOnly(status); } } } } catch (RuntimeException | Error ex) { triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN); throw ex; } triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK); } finally { cleanupAfterCompletion(status); } }3.3 提交事务
事务提交的核心方法位于processCommit()方法中,都是都是去调用TransactionSynchronization的一些BeforeCommit()和BeforeCompletion()方法
如果当前事务有安全点,提交时,只是释放安全点,只提交安全点之后的SQL
如果当前事务是一个新的事务,直接调用doCommit()方法,调用数据库连接的commit()方法进行提交
private void processCommit(DefaultTransactionStatus status) throws TransactionException { try { boolean beforeCompletionInvoked = false; try { boolean unexpectedRollback = false; prepareForCommit(status); triggerBeforeCommit(status); triggerBeforeCompletion(status); beforeCompletionInvoked = true; if (status.hasSavepoint()) { unexpectedRollback = status.isGlobalRollbackOnly(); status.releaseHeldSavepoint(); } else if (status.isNewTransaction()) { unexpectedRollback = status.isGlobalRollbackOnly(); doCommit(status); } } …… // Trigger afterCommit callbacks, with an exception thrown there // propagated to callers but the transaction still considered as committed. try { triggerAfterCommit(status); } finally { triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED); } } finally { // 恢复被挂起的资源到当前线程中 cleanupAfterCompletion(status); } }
注:Spring事务提交和回滚这一块,涉及到事务的传播关系,会出现多层级执行的问题,代码稍微显得有些复杂
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)