Spring事务执行的原理(@Transactional)

Spring事务执行的原理(@Transactional),第1张

事务执行的原理(@Transactional)

Spring框架默认使用基于CGLIB的AOP动态代理实现,根据前面的CGLIB动态代理原理的分析(Spring动态代理原理)。可以发现,应用启动的时候,在@EnableTransactionManagement注解的作用下会自动生成代理对象CglibAopProxy,并会为方法设置对应的拦截器链;当调用带有@Transactional注解的方法时,DynamicAdvisedInterceptor#intercept能够获取到该方法的拦截器链,由于方法带有事务注解,事务拦截器TransactionInterceptor一定存在于方法拦截器链中,在继续调用ReflectiveMethodInvocation#invoke中会对方法拦截器进行递归调用,事务拦截器TransactionInterceptor自然也会被调用,下面重点看一下该拦截器的执行逻辑。调用事务方法的流程图可以归纳如下:

事务拦截器TransactionInterceptor

TransactionInterceptor继承了TransactionAspectSupport,并且实现了MethodInterceptor接口,因此在调用目标方法之前就会先调用TransactionInterceptor#invoke方法,在方法里面会判断@Transactional注解,获取对应的事务属性,然后在事务的背景下执行目标方法。

TransactionInterceptor事务拦截器的继承关系如下图:

TransactionInterceptor#invoke方法先获取目标类,然后调用父类TransactionAspectSupport#invokeWithinTransaction方法

public Object invoke(MethodInvocation invocation) throws Throwable {
   Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);

   // 调用TransactionAspectSupport#invokeWithinTransaction
   return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
}

重点看一下TransactionAspectSupport#invokeWithinTransaction方法,他会根据不同的事务类型分别执行对应的处理逻辑,可以支持反应式事务、普通事务和带回调的事务

protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
      final InvocationCallback invocation) throws Throwable {

    // 如果事务属性为null,那这个方法是非事务的
   TransactionAttributeSource tas = getTransactionAttributeSource();
    // 通过SpringTransactionAnnotationParser解析是否带有@Transactional注解,如果有,解析注解的属性
   final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
    // 确定事务管理器TM
   final TransactionManager tm = determineTransactionManager(txAttr);

    // 反应式事务
   if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) {
       // 获取反应式事务支持
      ReactiveTransactionSupport txSupport = this.transactionSupportCache.computeIfAbsent(method, key -> {
         if (KotlinDetector.isKotlinType(method.getDeclaringClass()) && KotlinDelegate.isSuspend(method)) {
            throw new TransactionUsageException(
                  "Unsupported annotated transaction on suspending function detected: " + method +
                  ". Use TransactionalOperator.transactional extensions instead.");
         }
         ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(method.getReturnType());
         if (adapter == null) {
            throw new IllegalStateException("Cannot apply reactive transaction to non-reactive return type: " +
                  method.getReturnType());
         }
         return new ReactiveTransactionSupport(adapter);
      });
       // 调用目标方法
      return txSupport.invokeWithinTransaction(method, targetClass, invocation, txAttr, (ReactiveTransactionManager) tm);
   }

    // 转化为平台事务
   PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
    // 识别目标方法
   final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
    
   // 标准事务处理
   if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
       // 创建标准事务
      TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
      Object retVal;
      try {
          // 这是一个环绕advice,会在调用链里调用下一个拦截器;他通常会导致目标方法被调用
          // 调用目标方法
         retVal = invocation.proceedWithInvocation();
      } catch (Throwable ex) {
         // target invocation exception
         completeTransactionAfterThrowing(txInfo, ex);
         throw ex;
      } finally {
          // 清除事务信息
         cleanupTransactionInfo(txInfo);
      }
      // 匹配到回滚规则时设置回滚
      if (vavrPresent && VavrDelegate.isVavrTry(retVal)) {
         // Set rollback-only in case of Vavr failure matching our rollback rules...
         TransactionStatus status = txInfo.getTransactionStatus();
         if (status != null && txAttr != null) {
            retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
         }
      }
      // 返回后提交事务
      commitTransactionAfterReturning(txInfo);
      return retVal;
       
   } else { // 带回调的事务
      final ThrowableHolder throwableHolder = new ThrowableHolder();

      // It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
      try {
         Object result = ((CallbackPreferringPlatformTransactionManager) ptm).execute(txAttr, status -> {
             // 准备事务信息
            TransactionInfo txInfo = prepareTransactionInfo(ptm, txAttr, joinpointIdentification, status);
            try {
                // 调用目标方法
               Object retVal = invocation.proceedWithInvocation();
                // 匹配到回滚规则时设置回滚
               if (vavrPresent && VavrDelegate.isVavrTry(retVal)) {
                  // Set rollback-only in case of Vavr failure matching our rollback rules...
                  retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
               }
               return retVal;
            } catch (Throwable ex) {
               if (txAttr.rollbackOn(ex)) {
                  // A RuntimeException: will lead to a rollback.
                  if (ex instanceof RuntimeException) {
                     throw (RuntimeException) ex;
                  } else {
                     throw new ThrowableHolderException(ex);
                  }
               } else {
                  // A normal return value: will lead to a commit.
                  throwableHolder.throwable = ex;
                  return null;
               }
            } finally {
               cleanupTransactionInfo(txInfo);
            }
         });

         // Check result state: It might indicate a Throwable to rethrow.
         if (throwableHolder.throwable != null) {
            throw throwableHolder.throwable;
         }
         return result;
      } catch (ThrowableHolderException ex) {
         throw ex.getCause();
      } catch (TransactionSystemException ex2) {
         if (throwableHolder.throwable != null) {
            logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
            ex2.initApplicationException(throwableHolder.throwable);
         }
         throw ex2;
      } catch (Throwable ex2) {
         if (throwableHolder.throwable != null) {
            logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
         }
         throw ex2;
      }
   }
}

以标准事务执行的过程为例

创建事务

首先调用TransactionAspectSupport#createTransactionIfNecessary创建事务

protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
 @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {

    // 如果事务名称没有指定,使用方法名字作为事务名称
    if (txAttr != null && txAttr.getName() == null) {
        txAttr = new DelegatingTransactionAttribute(txAttr) {
            @Override
            public String getName() {
                return joinpointIdentification;
            }
        };
    }

    // 从事务管理器中获取事务的状态
    TransactionStatus status = null;
    if (txAttr != null) {
        if (tm != null) {
            status = tm.getTransaction(txAttr);
        } else { // 事务管理器必须要指定
            if (logger.isDebugEnabled()) {
                logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
                             "] because no transaction manager has been configured");
            }
        }
    }
    // 准备事务信息
    return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}

通过tm.getTransaction(txAttr)获取事务状态,这里会调用AbstractPlatformTransactionManager#getTransaction方法,具体的处理逻辑可以归纳为下图:

看一下具体的代码实现

@Override
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
    
    // 如果没有事务定义,就是用默认的
   TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());

    // 获取事务
   Object transaction = doGetTransaction();
   boolean debugEnabled = logger.isDebugEnabled();

    // 判断当前事务是否存在
   if (isExistingTransaction(transaction)) {
       // 当前已经存在事务->检查事务的传播级别,由此决定下一步如何处理
      return handleExistingTransaction(def, transaction, debugEnabled);
   }

    // 检查事务执行是否超时
   if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
      throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
   }

   if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
        // 当前不存在事务,但是事务传播级别为PROPAGATION_MANDATORY时,报错
      throw new IllegalTransactionStateException(
            "No existing transaction found for transaction marked with propagation 'mandatory'");
   } else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
         def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
         def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
       // PROPAGATION_REQUIRED、PROPAGATION_REQUIRES_NEW、PROPAGATION_NESTED传播级别的处理
       // 挂起事务
      SuspendedResourcesHolder suspendedResources = suspend(null);
      if (debugEnabled) {
         logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
      }
      try {
          // 开启事务
         return startTransaction(def, transaction, debugEnabled, suspendedResources);
      } catch (RuntimeException | Error ex) {
          // 发生异常时,恢复之前挂起的事务
         resume(null, suspendedResources);
         throw ex;
      }
   } else {
      // Create "empty" transaction: no actual transaction, but potentially synchronization.
      if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
         logger.warn("Custom isolation level specified but no actual transaction initiated; " +
               "isolation level will effectively be ignored: " + def);
      }
      boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
      return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
   }
}

AbstractPlatformTransactionManager#suspend方法会挂起给定的事务,首先挂起事务同步器,然后代理到doSuspend模板方法

@Nullable
protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
    // 判断事务同步是否激活,这里使用的是ThreadLocal线程本地变量
   if (TransactionSynchronizationManager.isSynchronizationActive()) {
       // 挂起同步器
      List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
      try {
         Object suspendedResources = null;
         if (transaction != null) {
             // 挂原来的事务,这个 *** 作一般是由DataSourceTransactionManager#doSuspend执行具体的挂起 *** 作
            suspendedResources = doSuspend(transaction);
         }
          // 获取原来事务的名称、只读标志、隔离级别、活跃状态等信息,将其放入SuspendedResourcesHolder中,方便后续恢复使用
          // 当前事务名称
         String name = TransactionSynchronizationManager.getCurrentTransactionName();
         TransactionSynchronizationManager.setCurrentTransactionName(null);
          // 当前事务只读标志
         boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
         TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
          // 当前事务隔离级别
         Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
         TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
          // 当前事务的激活状态
         boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
         TransactionSynchronizationManager.setActualTransactionActive(false);
          // 返回挂起的资源
         return new SuspendedResourcesHolder(
               suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
      } catch (RuntimeException | Error ex) {
          // 挂起事务失败,恢复事务同步
         doResumeSynchronization(suspendedSynchronizations);
         throw ex;
      }
   } else if (transaction != null) { // 给定事务不为空
       // 事务是活跃的,但是同步器不是活跃的。挂起原来的事务
      Object suspendedResources = doSuspend(transaction);
        // 返回挂起的资源
      return new SuspendedResourcesHolder(suspendedResources);
   } else { // 给定事务和同步器都不是活跃的
      return null;
   }
}

DataSourceTransactionManager#doSuspend执行具体的挂起 *** 作,他会在事务同步器TransactionSynchronizationManager中将数据库资源解绑

@Override
protected Object doSuspend(Object transaction) {
   DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
   txObject.setConnectionHolder(null);
   return TransactionSynchronizationManager.unbindResource(obtainDataSource());
}

AbstractPlatformTransactionManager#startTransaction可以启动新事务,它主要完成开启新事务和初始化事务同步器等工作

private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
      boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {

    // 与线程绑定的事务同步标志
   boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
    // 事务状态
   DefaultTransactionStatus status = newTransactionStatus(
         definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
    // 开启新事物
   doBegin(transaction, definition);
    // 初始化事务同步器
   prepareSynchronization(status, definition);
   return status;
}

DataSourceTransactionManager#doBegin主要是设置数据库连接,将自动提交事务改为手动提交事务,并且绑定连接holder到当前线程

@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
   DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
   Connection con = null;

   try {
       // 没有数据库连接或者需要事务同步时,设置新的数据库连接信息
      if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
         Connection newCon = obtainDataSource().getConnection();
         if (logger.isDebugEnabled()) {
            logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
         }
         txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
      }

      txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
      con = txObject.getConnectionHolder().getConnection();

       // 设置之前的事务隔离级别和事务只读标识
      Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
      txObject.setPreviousIsolationLevel(previousIsolationLevel);
      txObject.setReadOnly(definition.isReadOnly());

      // 如果数据库连接设置为自动提交,切换为手动提交
      if (con.getAutoCommit()) {
         txObject.setMustRestoreAutoCommit(true);
         if (logger.isDebugEnabled()) {
            logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
         }
         con.setAutoCommit(false);
      }

       // 准备事务连接
      prepareTransactionalConnection(con, definition);
      txObject.getConnectionHolder().setTransactionActive(true);

       // 获取事务超时设置
      int timeout = determineTimeout(definition);
      if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
         txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
      }

       // 绑定连接holder到当前线程
      if (txObject.isNewConnectionHolder()) {
         TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
      }
   } catch (Throwable ex) {
      if (txObject.isNewConnectionHolder()) { // 新连接时,释放连接
         DataSourceUtils.releaseConnection(con, obtainDataSource());
         txObject.setConnectionHolder(null, false);
      }
      throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
   }
}

AbstractPlatformTransactionManager#prepareSynchronization主要是对事务同步管理器TransactionSynchronizationManager进行初始化设置

protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
   if (status.isNewSynchronization()) {
      TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
      TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
            definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
                  definition.getIsolationLevel() : null);
      TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
      TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
      TransactionSynchronizationManager.initSynchronization();
   }
}

TransactionAspectSupport#prepareTransactionInfo可以设置事务的状态,并且将事务和线程绑定起来

protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm,
      @Nullable TransactionAttribute txAttr, String joinpointIdentification,
      @Nullable TransactionStatus status) {

   TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
   if (txAttr != null) {
      if (logger.isTraceEnabled()) {
         logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]");
      }
       // 设置新的事务状态
      txInfo.newTransactionStatus(status);
   } else {
      if (logger.isTraceEnabled()) {
         logger.trace("No need to create transaction for [" + joinpointIdentification +
               "]: This method is not transactional.");
      }
   }
    // 将事务信息绑定到线程,即使没有创建新的事务
   txInfo.bindToThread();
   return txInfo;
}
调用下一个拦截器

创建事务之后,invocation.proceedWithInvocation()会继续调用下一个拦截器,最后调用到目标方法

恢复老事务信息

事务方法调用完成之后,意味着当前事务已经结束,需要恢复之前的老事务信息,使用cleanupTransactionInfo来实现恢复

protected void cleanupTransactionInfo(@Nullable TransactionInfo txInfo) {
   if (txInfo != null) {
      txInfo.restoreThreadLocalStatus();
   }
}

这里的线程信息是保存在ThreadLocal中的, 这样保障了线程安全

private static final ThreadLocal<TransactionInfo> transactionInfoHolder =
      new NamedThreadLocal<>("Current aspect-driven transaction");

// 使用stack栈来恢复老的事务信息; 如果没有老的事务,会被设置为null
private void restoreThreadLocalStatus() {
    transactionInfoHolder.set(this.oldTransactionInfo);
}
提交事务

在TransactionAspectSupport#commitTransactionAfterReturning方法中,调用成功后执行提交事务步骤,异常被处理时不执行;没有事务的时候什么都不做

	protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
		if (txInfo != null && txInfo.getTransactionStatus() != null) {
			if (logger.isTraceEnabled()) {
				logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
			}
			txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
		}
	}

异常处理

当目标方法调用出现异常的时候,TransactionAspectSupport#completeTransactionAfterThrowing可以对事务进行回滚

protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
   if (txInfo != null && txInfo.getTransactionStatus() != null) {
      if (logger.isTraceEnabled()) {
         logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +"] after exception: " + ex);
      }
      if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
          // 设置了回滚异常类型
         try {
             // 回滚处理
            txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
         } catch (TransactionSystemException ex2) {
            logger.error("Application exception overridden by rollback exception", ex);
            ex2.initApplicationException(ex);
            throw ex2;
         } catch (RuntimeException | Error ex2) {
            logger.error("Application exception overridden by rollback exception", ex);
            throw ex2;
         }
      } else {
          // 在这个异常上不回滚;如果TransactionStatus.isRollbackOnly()=true,还是会回滚
         try {
             // 提交事务
            txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
         } catch (TransactionSystemException ex2) {
            logger.error("Application exception overridden by commit exception", ex);
            ex2.initApplicationException(ex);
            throw ex2;
         } catch (RuntimeException | Error ex2) {
            logger.error("Application exception overridden by commit exception", ex);
            throw ex2;
         }
      }
   }
}
事务管理器TM

Spring中的事务管理器定义了事务的开启、提交和回滚等基础接口,TransactionManager的继承关系如下图:

  • AbstractPlatformTransactionManager:实现Spring标准事务工作流的抽象基类,为具体的平台事务管理器(如:JtaTransactionManager)提供基础服务,可以提供的功能有:

    • 判断是否存在事务;
    • 应用合适的事务传播行为
    • 必要时挂起或者恢复事务
    • 在事务回滚时引用适当的修改(实际的回滚或者设置仅回滚)
    • 触发注册的同步回调(如果事务同步是激活的)

    他的子类必须为具体的事务状态实现具体的模板方法,如:begin,suspend,commit,rollback等。其中的抽象方法必须实现,其他的方法有默认实现,可以覆写

  • DataSourceTransactionManager:是单一JDBC数据源的实现。只要设置了使用DataSource作为连接工厂,这个类可以在任何有JDBC驱动的环境下工作。它可以绑定特定的DataSource到当前的线程,还可能允许一个线程绑定一个DataSource连接。

  • JdbcTransactionManager:他是普通 DataSourceTransactionManager的与JdbcAccessor对其的子类。会为提交和回滚步骤添加一般的JDBC异常转换。一般会和org.springframework.jdbc.core.JdbcTemplate联合使用,JdbcTemplate默认使用相同的基础 SQLExceptionTranslator

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/917263.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-16
下一篇 2022-05-16

发表评论

登录后才能评论

评论列表(0条)

保存