Spring框架默认使用基于CGLIB的AOP动态代理实现,根据前面的CGLIB动态代理原理的分析(Spring动态代理原理)。可以发现,应用启动的时候,在@EnableTransactionManagement注解的作用下会自动生成代理对象CglibAopProxy,并会为方法设置对应的拦截器链;当调用带有@Transactional注解的方法时,DynamicAdvisedInterceptor#intercept能够获取到该方法的拦截器链,由于方法带有事务注解,事务拦截器TransactionInterceptor一定存在于方法拦截器链中,在继续调用ReflectiveMethodInvocation#invoke中会对方法拦截器进行递归调用,事务拦截器TransactionInterceptor自然也会被调用,下面重点看一下该拦截器的执行逻辑。调用事务方法的流程图可以归纳如下:
TransactionInterceptor继承了TransactionAspectSupport,并且实现了MethodInterceptor接口,因此在调用目标方法之前就会先调用TransactionInterceptor#invoke方法,在方法里面会判断@Transactional注解,获取对应的事务属性,然后在事务的背景下执行目标方法。
TransactionInterceptor事务拦截器的继承关系如下图:
TransactionInterceptor#invoke方法先获取目标类,然后调用父类TransactionAspectSupport#invokeWithinTransaction方法
public Object invoke(MethodInvocation invocation) throws Throwable {
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
// 调用TransactionAspectSupport#invokeWithinTransaction
return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
}
重点看一下TransactionAspectSupport#invokeWithinTransaction方法,他会根据不同的事务类型分别执行对应的处理逻辑,可以支持反应式事务、普通事务和带回调的事务
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {
// 如果事务属性为null,那这个方法是非事务的
TransactionAttributeSource tas = getTransactionAttributeSource();
// 通过SpringTransactionAnnotationParser解析是否带有@Transactional注解,如果有,解析注解的属性
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
// 确定事务管理器TM
final TransactionManager tm = determineTransactionManager(txAttr);
// 反应式事务
if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) {
// 获取反应式事务支持
ReactiveTransactionSupport txSupport = this.transactionSupportCache.computeIfAbsent(method, key -> {
if (KotlinDetector.isKotlinType(method.getDeclaringClass()) && KotlinDelegate.isSuspend(method)) {
throw new TransactionUsageException(
"Unsupported annotated transaction on suspending function detected: " + method +
". Use TransactionalOperator.transactional extensions instead.");
}
ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(method.getReturnType());
if (adapter == null) {
throw new IllegalStateException("Cannot apply reactive transaction to non-reactive return type: " +
method.getReturnType());
}
return new ReactiveTransactionSupport(adapter);
});
// 调用目标方法
return txSupport.invokeWithinTransaction(method, targetClass, invocation, txAttr, (ReactiveTransactionManager) tm);
}
// 转化为平台事务
PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
// 识别目标方法
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
// 标准事务处理
if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
// 创建标准事务
TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
Object retVal;
try {
// 这是一个环绕advice,会在调用链里调用下一个拦截器;他通常会导致目标方法被调用
// 调用目标方法
retVal = invocation.proceedWithInvocation();
} catch (Throwable ex) {
// target invocation exception
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
} finally {
// 清除事务信息
cleanupTransactionInfo(txInfo);
}
// 匹配到回滚规则时设置回滚
if (vavrPresent && VavrDelegate.isVavrTry(retVal)) {
// Set rollback-only in case of Vavr failure matching our rollback rules...
TransactionStatus status = txInfo.getTransactionStatus();
if (status != null && txAttr != null) {
retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
}
}
// 返回后提交事务
commitTransactionAfterReturning(txInfo);
return retVal;
} else { // 带回调的事务
final ThrowableHolder throwableHolder = new ThrowableHolder();
// It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
try {
Object result = ((CallbackPreferringPlatformTransactionManager) ptm).execute(txAttr, status -> {
// 准备事务信息
TransactionInfo txInfo = prepareTransactionInfo(ptm, txAttr, joinpointIdentification, status);
try {
// 调用目标方法
Object retVal = invocation.proceedWithInvocation();
// 匹配到回滚规则时设置回滚
if (vavrPresent && VavrDelegate.isVavrTry(retVal)) {
// Set rollback-only in case of Vavr failure matching our rollback rules...
retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
}
return retVal;
} catch (Throwable ex) {
if (txAttr.rollbackOn(ex)) {
// A RuntimeException: will lead to a rollback.
if (ex instanceof RuntimeException) {
throw (RuntimeException) ex;
} else {
throw new ThrowableHolderException(ex);
}
} else {
// A normal return value: will lead to a commit.
throwableHolder.throwable = ex;
return null;
}
} finally {
cleanupTransactionInfo(txInfo);
}
});
// Check result state: It might indicate a Throwable to rethrow.
if (throwableHolder.throwable != null) {
throw throwableHolder.throwable;
}
return result;
} catch (ThrowableHolderException ex) {
throw ex.getCause();
} catch (TransactionSystemException ex2) {
if (throwableHolder.throwable != null) {
logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
ex2.initApplicationException(throwableHolder.throwable);
}
throw ex2;
} catch (Throwable ex2) {
if (throwableHolder.throwable != null) {
logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
}
throw ex2;
}
}
}
以标准事务执行的过程为例
创建事务首先调用TransactionAspectSupport#createTransactionIfNecessary创建事务
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
// 如果事务名称没有指定,使用方法名字作为事务名称
if (txAttr != null && txAttr.getName() == null) {
txAttr = new DelegatingTransactionAttribute(txAttr) {
@Override
public String getName() {
return joinpointIdentification;
}
};
}
// 从事务管理器中获取事务的状态
TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
status = tm.getTransaction(txAttr);
} else { // 事务管理器必须要指定
if (logger.isDebugEnabled()) {
logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
"] because no transaction manager has been configured");
}
}
}
// 准备事务信息
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}
通过tm.getTransaction(txAttr)获取事务状态,这里会调用AbstractPlatformTransactionManager#getTransaction方法,具体的处理逻辑可以归纳为下图:
看一下具体的代码实现
@Override
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
// 如果没有事务定义,就是用默认的
TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
// 获取事务
Object transaction = doGetTransaction();
boolean debugEnabled = logger.isDebugEnabled();
// 判断当前事务是否存在
if (isExistingTransaction(transaction)) {
// 当前已经存在事务->检查事务的传播级别,由此决定下一步如何处理
return handleExistingTransaction(def, transaction, debugEnabled);
}
// 检查事务执行是否超时
if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
}
if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
// 当前不存在事务,但是事务传播级别为PROPAGATION_MANDATORY时,报错
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
} else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
// PROPAGATION_REQUIRED、PROPAGATION_REQUIRES_NEW、PROPAGATION_NESTED传播级别的处理
// 挂起事务
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
}
try {
// 开启事务
return startTransaction(def, transaction, debugEnabled, suspendedResources);
} catch (RuntimeException | Error ex) {
// 发生异常时,恢复之前挂起的事务
resume(null, suspendedResources);
throw ex;
}
} else {
// Create "empty" transaction: no actual transaction, but potentially synchronization.
if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + def);
}
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
}
}
AbstractPlatformTransactionManager#suspend方法会挂起给定的事务,首先挂起事务同步器,然后代理到doSuspend模板方法
@Nullable
protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
// 判断事务同步是否激活,这里使用的是ThreadLocal线程本地变量
if (TransactionSynchronizationManager.isSynchronizationActive()) {
// 挂起同步器
List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
try {
Object suspendedResources = null;
if (transaction != null) {
// 挂原来的事务,这个 *** 作一般是由DataSourceTransactionManager#doSuspend执行具体的挂起 *** 作
suspendedResources = doSuspend(transaction);
}
// 获取原来事务的名称、只读标志、隔离级别、活跃状态等信息,将其放入SuspendedResourcesHolder中,方便后续恢复使用
// 当前事务名称
String name = TransactionSynchronizationManager.getCurrentTransactionName();
TransactionSynchronizationManager.setCurrentTransactionName(null);
// 当前事务只读标志
boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
// 当前事务隔离级别
Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
// 当前事务的激活状态
boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
TransactionSynchronizationManager.setActualTransactionActive(false);
// 返回挂起的资源
return new SuspendedResourcesHolder(
suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
} catch (RuntimeException | Error ex) {
// 挂起事务失败,恢复事务同步
doResumeSynchronization(suspendedSynchronizations);
throw ex;
}
} else if (transaction != null) { // 给定事务不为空
// 事务是活跃的,但是同步器不是活跃的。挂起原来的事务
Object suspendedResources = doSuspend(transaction);
// 返回挂起的资源
return new SuspendedResourcesHolder(suspendedResources);
} else { // 给定事务和同步器都不是活跃的
return null;
}
}
DataSourceTransactionManager#doSuspend执行具体的挂起 *** 作,他会在事务同步器TransactionSynchronizationManager中将数据库资源解绑
@Override
protected Object doSuspend(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
txObject.setConnectionHolder(null);
return TransactionSynchronizationManager.unbindResource(obtainDataSource());
}
AbstractPlatformTransactionManager#startTransaction可以启动新事务,它主要完成开启新事务和初始化事务同步器等工作
private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {
// 与线程绑定的事务同步标志
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
// 事务状态
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
// 开启新事物
doBegin(transaction, definition);
// 初始化事务同步器
prepareSynchronization(status, definition);
return status;
}
DataSourceTransactionManager#doBegin主要是设置数据库连接,将自动提交事务改为手动提交事务,并且绑定连接holder到当前线程
@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;
try {
// 没有数据库连接或者需要事务同步时,设置新的数据库连接信息
if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
Connection newCon = obtainDataSource().getConnection();
if (logger.isDebugEnabled()) {
logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
// 设置之前的事务隔离级别和事务只读标识
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
txObject.setReadOnly(definition.isReadOnly());
// 如果数据库连接设置为自动提交,切换为手动提交
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if (logger.isDebugEnabled()) {
logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
con.setAutoCommit(false);
}
// 准备事务连接
prepareTransactionalConnection(con, definition);
txObject.getConnectionHolder().setTransactionActive(true);
// 获取事务超时设置
int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
// 绑定连接holder到当前线程
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
}
} catch (Throwable ex) {
if (txObject.isNewConnectionHolder()) { // 新连接时,释放连接
DataSourceUtils.releaseConnection(con, obtainDataSource());
txObject.setConnectionHolder(null, false);
}
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
}
}
AbstractPlatformTransactionManager#prepareSynchronization主要是对事务同步管理器TransactionSynchronizationManager进行初始化设置
protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
if (status.isNewSynchronization()) {
TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
definition.getIsolationLevel() : null);
TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
TransactionSynchronizationManager.initSynchronization();
}
}
TransactionAspectSupport#prepareTransactionInfo可以设置事务的状态,并且将事务和线程绑定起来
protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr, String joinpointIdentification,
@Nullable TransactionStatus status) {
TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
if (txAttr != null) {
if (logger.isTraceEnabled()) {
logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
// 设置新的事务状态
txInfo.newTransactionStatus(status);
} else {
if (logger.isTraceEnabled()) {
logger.trace("No need to create transaction for [" + joinpointIdentification +
"]: This method is not transactional.");
}
}
// 将事务信息绑定到线程,即使没有创建新的事务
txInfo.bindToThread();
return txInfo;
}
调用下一个拦截器
创建事务之后,invocation.proceedWithInvocation()会继续调用下一个拦截器,最后调用到目标方法
恢复老事务信息事务方法调用完成之后,意味着当前事务已经结束,需要恢复之前的老事务信息,使用cleanupTransactionInfo来实现恢复
protected void cleanupTransactionInfo(@Nullable TransactionInfo txInfo) {
if (txInfo != null) {
txInfo.restoreThreadLocalStatus();
}
}
这里的线程信息是保存在ThreadLocal中的, 这样保障了线程安全
private static final ThreadLocal<TransactionInfo> transactionInfoHolder =
new NamedThreadLocal<>("Current aspect-driven transaction");
// 使用stack栈来恢复老的事务信息; 如果没有老的事务,会被设置为null
private void restoreThreadLocalStatus() {
transactionInfoHolder.set(this.oldTransactionInfo);
}
提交事务
在TransactionAspectSupport#commitTransactionAfterReturning方法中,调用成功后执行提交事务步骤,异常被处理时不执行;没有事务的时候什么都不做
protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
}
异常处理
当目标方法调用出现异常的时候,TransactionAspectSupport#completeTransactionAfterThrowing可以对事务进行回滚
protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +"] after exception: " + ex);
}
if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
// 设置了回滚异常类型
try {
// 回滚处理
txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
} catch (TransactionSystemException ex2) {
logger.error("Application exception overridden by rollback exception", ex);
ex2.initApplicationException(ex);
throw ex2;
} catch (RuntimeException | Error ex2) {
logger.error("Application exception overridden by rollback exception", ex);
throw ex2;
}
} else {
// 在这个异常上不回滚;如果TransactionStatus.isRollbackOnly()=true,还是会回滚
try {
// 提交事务
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
} catch (TransactionSystemException ex2) {
logger.error("Application exception overridden by commit exception", ex);
ex2.initApplicationException(ex);
throw ex2;
} catch (RuntimeException | Error ex2) {
logger.error("Application exception overridden by commit exception", ex);
throw ex2;
}
}
}
}
事务管理器TM
Spring中的事务管理器定义了事务的开启、提交和回滚等基础接口,TransactionManager的继承关系如下图:
-
AbstractPlatformTransactionManager:实现Spring标准事务工作流的抽象基类,为具体的平台事务管理器(如:JtaTransactionManager)提供基础服务,可以提供的功能有:
- 判断是否存在事务;
- 应用合适的事务传播行为
- 必要时挂起或者恢复事务
- 在事务回滚时引用适当的修改(实际的回滚或者设置仅回滚)
- 触发注册的同步回调(如果事务同步是激活的)
他的子类必须为具体的事务状态实现具体的模板方法,如:begin,suspend,commit,rollback等。其中的抽象方法必须实现,其他的方法有默认实现,可以覆写
-
DataSourceTransactionManager:是单一JDBC数据源的实现。只要设置了使用DataSource作为连接工厂,这个类可以在任何有JDBC驱动的环境下工作。它可以绑定特定的DataSource到当前的线程,还可能允许一个线程绑定一个DataSource连接。
-
JdbcTransactionManager:他是普通 DataSourceTransactionManager的与JdbcAccessor对其的子类。会为提交和回滚步骤添加一般的JDBC异常转换。一般会和org.springframework.jdbc.core.JdbcTemplate联合使用,JdbcTemplate默认使用相同的基础 SQLExceptionTranslator
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)