Spring 事务底层源码解析(下)

Spring 事务底层源码解析(下),第1张

Spring 事务底层源码解析(下) 一、Spring 事务执行流程

我们在使用Spring事务机制时,需要在配置类创建一个事务管理器的实例,用于处理事务提交、回滚、关闭等 *** 作

定义一个DataSourceTransactionManager实例的事务管理器

@Bean
public PlatformTransactionManager transactionManager() {
   DataSourceTransactionManager transactionManager = new DataSourceTransactionManager();
   transactionManager.setDataSource(dataSource());
   return transactionManager;
}

@Bean
public DataSource dataSource() {
   DriverManagerDataSource dataSource = new DriverManagerDataSource();
   dataSource.setUrl("jdbc:mysql://127.0.0.1:3306/test?characterEncoding=utf-8&useSSL=false");
   dataSource.setUsername("root");
   dataSource.setPassword("ROOT");
   return dataSource;
}

在详细介绍Spring事务执行的源码之前,先大概介绍一个事务执行的大概流程,其中重点介绍一下Spring事务的传播机制

1.1 基本流程

在上面生成Advise实例中,生成的TransactionInterceptor对象其实就是一个methodInterceptor对象,在执行事务方法时,会调用TransactionInterceptor对象的invoke()方法,基本执行流程为:

  • 利用所配置的PlatformTransactionManager事务管理器新建一个数据库连接
  • 修改数据库连接的autocommit为false,这样才能由Spring来控制提交或回滚逻辑
  • 执行MethodInvocation.proceed()方法,简单理解就是去执行事务方法,其中就会执行SQL
  • 如果没有异常,则提交
  • 如果抛出异常,就回滚
1.2 Spring事务传播机制

然而在真正的开发工作中,肯定还会出现方法之间的调用,以a()方法和b()方法为例,a()方法中调用b()方法,那么a()方法中的事务是否也可以用在b()方法中呢?

针对这个问题,Spring提供了事务传播机制,用来解决方法调用时,事务的作用域问题

下面以一个简单的例子介绍一个Spring事务传播机制的实现过程,假设a()方法和b()方法都使用了@Transactional注解,a()方法在一个事务中执行,调用b()方法时,需要重新开一个事务执行:

  • 代理对象执行a()方法之前,需要先利用事务管理器新建一个数据库连接conn
  • 将数据库连接conn的autocommit改为false
  • 把数据库连接conn设置到ThreadLocal中,(这样可以保证同一个线程中,用的都是用一个数据库连接)
  • 执行a()方法中的SQL
  • 执行a()方法过程中,调用b()方法(注意用代理对象调用,否则事务不生效)
    • 代理对象执行b()方法前,首先判断当前线程已经存在一个数据库连接conn,表示当前线程已经拥有了一个Spring事务,则将该事务挂起
    • 挂起就是把ThreadLoacl中的数据库连接conn从ThreadLocal中移除,并放入到一个挂起资源对象
    • 挂起完成后,再利用事务管理器新建一个数据库连接reconn
    • 将数据库连接reconn的autocommit设置为false
    • 把数据库连接reconn设置到ThreadLocal中
    • 执行b()方法中的SQL
    • b()方法执行完成之后,则从ThreadLocal中拿到数据库连接reconn进行提交
    • 提交完成之后恢复所挂起的数据库连接conn,这里的恢复就是把在挂起资源对象中所保存的数据库连接conn再次设置到ThreadLocal中
  • a()方法正常执行完,则从ThreadLocal中拿到数据库连接conn进行提交

注:在执行某方方法时,判断当前是否存在一个事务,就是判断当前线程的ThreadLocal中是否存在一个数据库连接对象,如果存在则表示以及存在一个事务了

如果以非事务运行时,表示在执行方法时,Spring事务管理器不会去建立数据库连接,执行SQL时,由Mybatis或JdbcTemplate建立数据库连接来执行SQL

Spring提供了7种传播机制:

  • REQUIRED:没有事务则新建,有则直接在该事务上执行
  • SUPPORTS:如果有事务,直接在该事务上运行,如果没有事务,就以非事务方式运行
  • MANDATORY:如果没有事务,直接抛异常;如果有事务,直接在当前事务运行
  • REQUIRES_NEW:创建一个新的事务,并挂起当前的事务
  • NOT_SUPPORTED:以非事务方式运行,如果存在事务,则把当前事务挂起
  • NEVER:以非事务方式运行,如果存在事务,则抛出异常
  • NESTED:同MySQL的savepoint,如果存在事务,则在事务内部运行(可以单独回滚),如果没有事务则新建事务
二、开启Spring事务

在前面已经说过,在执行事务方法时,会调用TransactionInterceptor的invoke()方法,这里使用匿名内部类创建了一个CoroutinesInvocationCallback对象,而该对象是在开始事务之后,才会去调用里面的proceedWithInvocation()方法,而在invokeWithinTransaction()方法里面,会去创建事务等

public Object invoke(MethodInvocation invocation) throws Throwable {
   // Work out the target class: may be {@code null}.
   // The TransactionAttributeSource should be passed the target class
   // as well as the method, which may be from an interface.
   Class targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);

   // Adapt to TransactionAspectSupport's invokeWithinTransaction...
   return invokeWithinTransaction(invocation.getMethod(), targetClass, new CoroutinesInvocationCallback() {
      @Override
      @Nullable
      public Object proceedWithInvocation() throws Throwable {
         return invocation.proceed();
      }
      @Override
      public Object getTarget() {
         return invocation.getThis();
      }
      @Override
      public Object[] getArguments() {
         return invocation.getArguments();
      }
   });
}
2.1 获取事务管理器

在《Spring 事务源码分析(上)》的生成Advise实例时,设置了TransactionAttributeSource实例,这个实例更像是一个工具类,在各种解析的时候都需要用到它

在invokeWithinTransaction()方法内部,首先就是获取TransactionAttributeSource实例,然后调用它的getTransactionAttribute()方法,将method方法上@Transactional注解的配置生成一个TransactionAttribute对象,这个对象包含了事务的所有配置

接着就是调用determineTransactionManager()去获取一个事务管理器实例

protected Object invokeWithinTransaction(Method method, @Nullable Class targetClass,
                                         final InvocationCallback invocation) throws Throwable {

    // If the transaction attribute is null, the method is non-transactional.
    // TransactionAttribute就是@Transactional中的配置
    TransactionAttributeSource tas = getTransactionAttributeSource();
    final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);

    final TransactionManager tm = determineTransactionManager(txAttr);
    ……
}

在文章的开始位置,定义了一个DataSourceTransactionManager事务管理器对象,在@Transactional注解的value()和transactionManager()配置可以指定事务管理器的名称,在获取事务管理器时,首先根据配置来获取;如果没有配置,则直接调用getBean()方法,通过指定TransactionManager类型,获取事务管理器的实例,最后进行缓存

protected TransactionManager determineTransactionManager(@Nullable TransactionAttribute txAttr) {
   ……
   // @Transactional注解指定的事务管理器名称
   String qualifier = txAttr.getQualifier();
   if (StringUtils.hasText(qualifier)) {
      return determineQualifiedTransactionManager(this.beanFactory, qualifier);
   }
   else if (StringUtils.hasText(this.transactionManagerBeanName)) {
      return determineQualifiedTransactionManager(this.beanFactory, this.transactionManagerBeanName);
   }
   else {
      TransactionManager defaultTransactionManager = getTransactionManager();
      if (defaultTransactionManager == null) {
         defaultTransactionManager = this.transactionManagerCache.get(DEFAULT_TRANSACTION_MANAGER_KEY);
         if (defaultTransactionManager == null) {
             // 直接从容器获取TransactionManager 事务管理器实例
            defaultTransactionManager = this.beanFactory.getBean(TransactionManager.class);
            this.transactionManagerCache.putIfAbsent(
                  DEFAULT_TRANSACTION_MANAGER_KEY, defaultTransactionManager);
         }
      }
      return defaultTransactionManager;
   }
}
2.2 创建事务

获取事务管理器之后,就可以创建事务了,但在创建事务之前,需要去生成一个JoinPoint,通常把方法名作为连接点

在这里创建事务时,返回可能是现有的事务,也可能是一个新的事务,这与方法上定义的事务传播机制有关

所以这里把TransactionInfo理解成是一个逻辑事务,事务创建完成之后,就会去回调proceedWithInvocation()方法,这个方法可能是去执行下一个MethodInterceptor,也可能就是直接调用事务方法

执行过程中过程中出现异常,会调用completeTransactionAfterThrowing()方法回滚,最后从ThreadLocal中清除当前事务,最后就是调用commitTransactionAfterReturning()方法去提交事务

PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
// joinpoint的唯一标识
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
    // Standard transaction demarcation with getTransaction and commit/rollback calls.
    // 如果有必要就创建事务,这里就涉及到事务传播机制的实现了
    // TransactionInfo表示一个逻辑事务,比如两个逻辑事务属于同一个物理事务
    TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);

    Object retVal;
    try {
        // This is an around advice: Invoke the next interceptor in the chain.
        // This will normally result in a target object being invoked.
        // 执行下一个Interceptor或被代理对象中的方法
        retVal = invocation.proceedWithInvocation();
    }
    catch (Throwable ex) {
        // target invocation exception
        completeTransactionAfterThrowing(txInfo, ex);
        throw ex;
    }
    finally {
        cleanupTransactionInfo(txInfo);
    }
	……
    // 提交事务
    commitTransactionAfterReturning(txInfo);
    return retVal;
}

下面重点介绍createTransactionIfNecessary()方法是如何创建事务的

2.2.1 创建TransactionStatus

如果有@Transactional注解,并且也有事务管理器,就为当前事务方法创建一个TransactionStatus对象,TransactionStatus中的属性记录了物理事务是不是最新的,如果不是最新的,说明当前方法使用的是同一个事务

然后调用prepareTransactionInfo生成也给TransactionInfo对象,并把TransactionStatus对象设置进去,最后把TransactionInfo对象放入到当前线程ThreadLocal对象transactionInfoHolder中

protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
      @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {

   // 每个逻辑事务都会创建一个TransactionStatus,但是TransactionStatus中有一个属性代表当前逻辑事务底层的物理事务是不是新的
   TransactionStatus status = null;
   if (txAttr != null) {
      if (tm != null) {
         status = tm.getTransaction(txAttr);
      }
   }
   return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}
2.2.2 先尝试获取事务(可能为空)

通过getTransaction()方法来获取事务,首先会尝试去当前线程的ThreadLocal中去拿,前面的事务管理器配置的是DataSourceTransactionManager,所以下面看DataSourceTransactionManager中doGetTransaction()方法的实现

public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
    throws TransactionException {

    // Use defaults if no transaction definition given.
    TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());

    Object transaction = doGetTransaction();
    boolean debugEnabled = logger.isDebugEnabled();
    ……
}

获取把事务管理器设置的dataSource对象作为Key,调用getResource()来获取数据库连接,然后将得到的数据库连接设置到新生成的DataSourceTransactionObject对象中,通过设置这个连接的状态为false,表示不是最新的物理连接

这时获取的数据库连接可能为空

protected Object doGetTransaction() {
    DataSourceTransactionObject txObject = new DataSourceTransactionObject();
    txObject.setSavepointAllowed(isNestedTransactionAllowed());
    ConnectionHolder conHolder =
        (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
    txObject.setConnectionHolder(conHolder, false);
    return txObject;
}

getResource()方法会去调用doGetResource()方法,在该方法中,resources对象就是一个ThreadLocal,存放的是当前线程的数据库连接,所以在doGetResource()方法中,直接根据key就可以获取数据库连接

private static final ThreadLocal> resources =
    new NamedThreadLocal<>("Transactional resources");

private static Object doGetResource(Object actualKey) {
    // resources是一个ThreadLocal包装的Map,用来缓存资源的,比如缓存当前线程中由某个DataSource所创建的数据库连接
    Map map = resources.get();
    if (map == null) {
        return null;
    }

    // 获取DataSource对象所对应的数据库连接对象
    Object value = map.get(actualKey);
    // Transparently remove ResourceHolder that was marked as void...
    if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
        map.remove(actualKey);
        // Remove entire ThreadLocal if empty...
        if (map.isEmpty()) {
            resources.remove();
        }
        value = null;
    }
    return value;
}
2.2.3 当前没有事务,则新建事务

上面通过doGetTransaction()方法去获取当前线程的事务,返回的一个DataSourceTransactionObject对象,这个对象里面记录数据库连接

接下来,就是要判断是否有数据库连接

Object transaction = doGetTransaction();

// transaction.getConnectionHolder().isTransactionActive()
if (isExistingTransaction(transaction)) {
   // Existing transaction found -> check propagation behavior to find out how to behave.
   return handleExistingTransaction(def, transaction, debugEnabled);
}

isExistingTransaction()方法判断是否有数据库连接也很简单,就是判断前面返回的DataSourceTransactionObject中,它的connectionHolder属性是否为null即可

protected boolean isExistingTransaction(Object transaction) {
   DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
   return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());
}

如果当前线程并没有数据库连接,则去创建一个数据库连接。但在开启一个事务之前,要检查一些事务的配置,比如超时时间不能小于-1,如果当前方法定义的事务传播机制为MANDATORY,直接抛异常

在没有事务时,只有REQUIRED、REQUIRES_NEW和NESTED这三种事务传播机制才会去创建新的事务

if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
    throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
}

// No existing transaction found -> check propagation behavior to find out how to proceed.
if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
    throw new IllegalTransactionStateException(
        "No existing transaction found for transaction marked with propagation 'mandatory'");
}
else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
         def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
         def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
    // 没有事务需要挂起,不过TransactionSynchronization有可能需要挂起
    // suspendedResources表示当前线程被挂起的资源持有对象(数据库连接、TransactionSynchronization)
    SuspendedResourcesHolder suspendedResources = suspend(null);
    if (debugEnabled) {
        logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
    }
    try {
        // 开启事务后,transaction中就会有数据库连接了,并且isTransactionActive为true
        // 并返回TransactionStatus对象,该对象保存了很多信息,包括被挂起的资源
        return startTransaction(def, transaction, debugEnabled, suspendedResources);
    }
    catch (RuntimeException | Error ex) {
        resume(null, suspendedResources);
        throw ex;
    }
}

但在创建事务之前,还有一个重要的动作,就是挂起当前事务,即把当前事务信息封装成一个SuspendedResourcesHolder

下面看下suspend()方法在做什么,首先会去判断当前线程是否有TransactionSynchronization,如果有,则需要把TransactionSynchronization相关的配置全部拿出来,然后将事务的活动状态设置为false,表示当前事务不可用,然后生成一个SuspendedResourcesHolder对象暂存当前事务的信息

protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
   // synchronizations是一个ThreadLocal>
   // 我们可以在任何地方通过TransactionSynchronizationManager给当前线程添加TransactionSynchronization,

   // 如果当前线程存在TransactionSynchronization
   if (TransactionSynchronizationManager.isSynchronizationActive()) {
      // 调用TransactionSynchronization的suspend方法,并清空和返回当前线程中所有的TransactionSynchronization对象
      List suspendedSynchronizations = doSuspendSynchronization();
      try {
         Object suspendedResources = null;
         if (transaction != null) {
            // 挂起事务,把transaction中的Connection清空,并把resources中的key-value进行移除,并返回数据库连接Connection对象
            suspendedResources = doSuspend(transaction);
         }

         // 获取并清空当前线程中关于TransactionSynchronizationManager的设置
         String name = TransactionSynchronizationManager.getCurrentTransactionName();
         TransactionSynchronizationManager.setCurrentTransactionName(null);
         boolean readonly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
         TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
         Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
         TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
         boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
         TransactionSynchronizationManager.setActualTransactionActive(false);

         // 将当前线程中的数据库连接对象、TransactionSynchronization对象、TransactionSynchronizationManager中的设置构造成一个对象
         // 表示被挂起的资源持有对象,持有了当前线程中的事务对象、TransactionSynchronization对象
         return new SuspendedResourcesHolder(
               suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
      }
      catch (RuntimeException | Error ex) {
         // doSuspend failed - original transaction is still active...
         doResumeSynchronization(suspendedSynchronizations);
         throw ex;
      }
   }
   else if (transaction != null) {
      // Transaction active but no synchronization active.
      Object suspendedResources = doSuspend(transaction);
      return new SuspendedResourcesHolder(suspendedResources);
   }
   else {
      // Neither transaction nor synchronization active.
      return null;
   }
}

如果当前事务不为空,会调用doSuspend()方法把当前事务挂起,也就是从ThreadLocal中把数据库连接移除

protected Object doSuspend(Object transaction) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    txObject.setConnectionHolder(null);
    return TransactionSynchronizationManager.unbindResource(obtainDataSource());
}
// resources是存放数据库连接的ThreadLocal
private static Object doUnbindResource(Object actualKey) {
    Map map = resources.get();
    if (map == null) {
        return null;
    }
    Object value = map.remove(actualKey);
    // Remove entire ThreadLocal if empty...
    if (map.isEmpty()) {
        resources.remove();
    }
    // Transparently suppress a ResourceHolder that was marked as void...
    if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
        value = null;
    }
    return value;
}

TransactionSynchronization是一个什么东西,我们可以通过TransactionSynchronizationManager在任何事务方法中来添加一些TransactionSynchronization,TransactionSynchronization中的这些方法,会在事务的不同阶段执行

TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
   @Override
   public int getOrder() {
      return TransactionSynchronization.super.getOrder();
   }

   // 事务挂起时执行
   @Override
   public void suspend() {
      TransactionSynchronization.super.suspend();
   }

   @Override //事务恢复时执行
   public void resume() {
      TransactionSynchronization.super.resume();
   }

   @Override
   public void flush() {
      TransactionSynchronization.super.flush();
   }

   @Override // 提交前执行
   public void beforeCommit(boolean readOnly) {
      TransactionSynchronization.super.beforeCommit(readOnly);
   }

   @Override // 回滚前执行
   public void beforeCompletion() {
      TransactionSynchronization.super.beforeCompletion();
   }

   @Override
   public void afterCommit() {
      TransactionSynchronization.super.afterCommit();
   }

   @Override
   public void afterCompletion(int status) {
      TransactionSynchronization.super.afterCompletion(status);
   }
});

接下来,才是真正地去开启一个事务,调用doBegin()获得一个新的连接,该方法的第一个参数是DataSourceTransactionObject对象,就是前面保存数据库连接的对象

开启一个事务之后,通过prepareSynchronization()方法来初始化TransactionSynchronization

private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
      boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {

   boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);

   // 开启的这个事务的状态信息:
   // 事务的定义、用来保存数据库连接的对象、是否是新事务,是否是新的TransactionSynchronization
   DefaultTransactionStatus status = newTransactionStatus(
         definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
   // 开始事务
   doBegin(transaction, definition);
   prepareSynchronization(status, definition);
   return status;
}

在doBegin()方法中,会通过DataSource获取一个数据库连接,然后设置DataSourceTransactionObject对象的一些参数,包括数据库连接,是否是新的连接,隔离等级,只读等设置

同时要把数据库连接的AutoCommit设置为false,由Spring来控制事务的提交于回滚

通过prepareTransactionalConnection()方法来设置当前事务为只读事务

最后调用bindResource()方法,将数据库连接添加到当前线程ThreadLoacl的resource中,至此,便开始了一个新的数据库事务

protected void doBegin(Object transaction, TransactionDefinition definition) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    Connection con = null;

    // 如果当前线程中所使用的DataSource还没有创建过数据库连接,就获取一个新的数据库连接
    if (!txObject.hasConnectionHolder() ||
        txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
        Connection newCon = obtainDataSource().getConnection();
        txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
    }

    txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
    con = txObject.getConnectionHolder().getConnection();

    Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
    txObject.setPreviousIsolationLevel(previousIsolationLevel);
    txObject.setReadOnly(definition.isReadOnly());

    // Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
    // so we don't want to do it unnecessarily (for example if we've explicitly
    // configured the connection pool to set it already).
    // 设置autocommit为false
    if (con.getAutoCommit()) {
        txObject.setMustRestoreAutoCommit(true);
        con.setAutoCommit(false);
    }

    prepareTransactionalConnection(con, definition);
    txObject.getConnectionHolder().setTransactionActive(true);

    // Bind the connection holder to the thread.
    if (txObject.isNewConnectionHolder()) {
        TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
    }

}

如果当前事务配置为只读事务,则执行SQL方法,将事务设置为只读

protected void prepareTransactionalConnection(Connection con, TransactionDefinition definition)
      throws SQLException {

   if (isEnforceReadOnly() && definition.isReadOnly()) {
      try (Statement stmt = con.createStatement()) {
         stmt.executeUpdate("SET TRANSACTION READ ONLY");
      }
   }
}
2.2.4 当前有事务,根据事务传播机制判断是否需要新建事务

上面介绍了在没有事务时的情况,如果当前线程已经存在事务了,又会怎么处理呢

如果当前ThreadLocal中存在数据库连接,则调用handleExistingTransaction()方法来处理已存在的事务

public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
    throws TransactionException {

    Object transaction = doGetTransaction();
    boolean debugEnabled = logger.isDebugEnabled();

    // transaction.getConnectionHolder().isTransactionActive()
    if (isExistingTransaction(transaction)) {
        // Existing transaction found -> check propagation behavior to find out how to behave.
        return handleExistingTransaction(def, transaction, debugEnabled);
    }
    ……
}

在handleExistingTransaction()方法中,就按照事务传播机制来决定是否新建事务或无事务执行或抛异常

PROPAGATION_NEVER:已经有事务存在,直接抛异常

PROPAGATION_NOT_SUPPORTED:挂起当前事务,以无事务方式运行

PROPAGATION_REQUIRES_NEW:挂起当前事务,创建新的事务

PROPAGATION_NESTED:如果以savePoint来解决这种事务,就在当前事务创建SavePoint,回滚时可以局部回滚;如果不使用SavePoint,就新建一个事务。但PlatformTransactionManager类型的事务管理理器的useSavepointForNestedTransaction()方法固定返回true

如果没有创建新的事务,最后调用prepareTransactionStatus()方法,为当前事务构建一个新的TransactionStatus对象

private TransactionStatus handleExistingTransaction(
    TransactionDefinition definition, Object transaction, boolean debugEnabled)
    throws TransactionException {

    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
        throw new IllegalTransactionStateException(
            "Existing transaction found for transaction marked with propagation 'never'");
    }

    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
        Object suspendedResources = suspend(transaction);
        boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
        return prepareTransactionStatus(
            definition, null, false, newSynchronization, debugEnabled, suspendedResources);
    }

    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
        SuspendedResourcesHolder suspendedResources = suspend(transaction);
        try {
            return startTransaction(definition, transaction, debugEnabled, suspendedResources);
        }
        catch (RuntimeException | Error beginEx) {
            resumeAfterBeginException(transaction, suspendedResources, beginEx);
            throw beginEx;
        }
    }

    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
        if (!isNestedTransactionAllowed()) {
            throw new NestedTransactionNotSupportedException(
                "Transaction manager does not allow nested transactions by default - " +
                "specify 'nestedTransactionAllowed' property with value 'true'");
        }
        if (useSavepointForNestedTransaction()) {
            DefaultTransactionStatus status =
                prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
            status.createAndHoldSavepoint();
            return status;
        }
        else {
            return startTransaction(definition, transaction, debugEnabled, null);
        }
    }

    boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
    return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}
protected boolean useSavepointForNestedTransaction() {
   return true;
}
三、提交/回滚事务 3.1 恢复事务

在创建新的事务之前,都会将现在的事务挂起,如果在创建新事务的过程中出现了异常,还需要把这些挂起的事务恢复

或者当前事务已经提交或回滚了,也需要去恢复已经挂起的事务

所有恢复事务的方法都会去调用resume()方法,在该方法中,会去调用doResume()方法,而doResume()方法中,就是直接把之前已经挂起的事务,重新放入到当前线程ThreadLocal的resource中

protected final void resume(@Nullable Object transaction, @Nullable SuspendedResourcesHolder resourcesHolder)
    throws TransactionException {

    if (resourcesHolder != null) {
        Object suspendedResources = resourcesHolder.suspendedResources;
        if (suspendedResources != null) {
            doResume(transaction, suspendedResources);
        }
        List suspendedSynchronizations = resourcesHolder.suspendedSynchronizations;
        if (suspendedSynchronizations != null) {
            TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);
            TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);
            TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);
            TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name);
            doResumeSynchronization(suspendedSynchronizations);
        }
    }
}

protected void doResume(@Nullable Object transaction, Object suspendedResources) {
    TransactionSynchronizationManager.bindResource(obtainDataSource(), suspendedResources);
}
3.2 回滚事务

现在我们回到最开始的invokeWithinTransaction()方法,在执行proceedWithInvocation()方法时,如果出现异常,会调用completeTransactionAfterThrowing()方法进行回滚

if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
    TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);

    Object retVal;
    try {
        retVal = invocation.proceedWithInvocation();
    }
    catch (Throwable ex) {
        // target invocation exception
        completeTransactionAfterThrowing(txInfo, ex);
        throw ex;
    }
    finally {
        cleanupTransactionInfo(txInfo);
    }

    // 提交事务
    commitTransactionAfterReturning(txInfo);
}

在completeTransactionAfterThrowing()方法中,首先调用rollback()方法判断抛出的异常与@Transactional中回滚规则中定义的异常是否匹配,如果匹配则直接回滚

如果不匹配,说明手动强制回滚,调用下面的commit()方法

protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
    if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
        txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
    }else {
        // We don't roll back on this exception.
        // Will still roll back if TransactionStatus.isRollbackonly() is true.
        txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
    }
}
3.2.1 强制回滚

在commit()方法中,首先会判断当前事务的rollbackOnly属性是否为true

public final void commit(TransactionStatus status) throws TransactionException {
    DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
    if (defStatus.isLocalRollbackOnly()) {
        processRollback(defStatus, false);
        return;
    }
    ……
}

rollbackOnly的用途用下:

test()方法和testB()方法中的SQL可以正常执行,但调用完testB()方法后,抛出了异常,同时为了能让返回的异常清晰明细,进行捕获,然后构建一个简单的异常信息,因为异常被捕获了,导致事务就可以正常提交。为了能让前面的SQL回滚,我们可以通过setRollbackonly()方法设置当前事务的rollbackOnly为true,这样在提交时,会强制回滚

@Transactional(rollbackFor = Exception.class)
public Map test(){
    jdbcTemplate.execute("insert into user values (1,'lizhi',24)");
    userService.testB();
    try {
        throw new NullPointerException();
    } catch (Exception e) {
        TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
        return new HashMap();
    }
}

@Transactional(rollbackFor = NullPointerException.class)
public void testB(){
    jdbcTemplate.execute("insert into user values (2,'lizhi',24)");
}

在processRollback()方法中,调用triggerBeforeCompletion()方法,去执行所有TransactionSynchronization的beforeCompletion()方法

如果当前事务有安全点,则直接回滚到安全点

如果当前事务是一个新的事务,则直接回滚

如果当前事务设置了rollbackOnly属性,或者globalRollbackOnParticipationFailure属性为true,就把当前事务的rollbackOnly属性设置为true,当事务提交的时候,会判断,然后进行回滚

globalRollbackOnParticipationFailure属性表示部分异常,全局回滚,默认为true,比如a()方法调用b()方法,如果b()方法出现了异常,a()方法中的SQL也会被回滚

然后调用triggerAfterCompletion()执行所有TransactionSynchronization的afterCompletion()方法

最后调用cleanupAfterCompletion()方法去清除当前事务的信息,以及恢复挂起的事务

private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
    try {
        boolean unexpectedRollback = unexpected;

        try {
            triggerBeforeCompletion(status);

            if (status.hasSavepoint()) {
                status.rollbackToHeldSavepoint();
            }
            else if (status.isNewTransaction()) {
                doRollback(status);
            }
            else {
                // Participating in larger transaction
                if (status.hasTransaction()) {
                    if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
                        doSetRollbackOnly(status);
                    }
                }
            }
        }
        catch (RuntimeException | Error ex) {
            triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
            throw ex;
        }

        triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
    }
    finally {
        cleanupAfterCompletion(status);
    }
}
3.3 提交事务

事务提交的核心方法位于processCommit()方法中,都是都是去调用TransactionSynchronization的一些BeforeCommit()和BeforeCompletion()方法

如果当前事务有安全点,提交时,只是释放安全点,只提交安全点之后的SQL

如果当前事务是一个新的事务,直接调用doCommit()方法,调用数据库连接的commit()方法进行提交

private void processCommit(DefaultTransactionStatus status) throws TransactionException {
    try {
        boolean beforeCompletionInvoked = false;

        try {
            boolean unexpectedRollback = false;
            prepareForCommit(status);
            triggerBeforeCommit(status);
            triggerBeforeCompletion(status);
            beforeCompletionInvoked = true;

            if (status.hasSavepoint()) {
                unexpectedRollback = status.isGlobalRollbackOnly();
                status.releaseHeldSavepoint();
            }
            else if (status.isNewTransaction()) {
                unexpectedRollback = status.isGlobalRollbackOnly();
                doCommit(status);
            }
        }
        ……

        // Trigger afterCommit callbacks, with an exception thrown there
        // propagated to callers but the transaction still considered as committed.
        try {
            triggerAfterCommit(status);
        }
        finally {
            triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
        }

    }
    finally {
        // 恢复被挂起的资源到当前线程中
        cleanupAfterCompletion(status);
    }
}

注:Spring事务提交和回滚这一块,涉及到事务的传播关系,会出现多层级执行的问题,代码稍微显得有些复杂

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5672113.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存