参考官方文档或者其他博主,这里就掠过了
2、事务依赖、事务管理器依赖的添加事务的使用必须建立在事务管理器上。spring给我们已经配置好了事务管理器,我们只需要将其添加到我们的ioc容器中即可
依赖的添加
org.springframework spring-tx5.2.11.RELEASE org.springframework spring-jdbc5.2.11.RELEASE
事务管理器是基于TransactionManager这个接口的实现,我们使用spring-jdbc包下面的DataSourceTransactionManager来做事务管理。而我们后面的所有的事务源码分析也都是分析这个类
3、注解版事务的开启大多数人都会使用xml文件来配置相关事务管理器,并且开启事务。这里我们来使用注解方式来开启事务。
这里直接贴上我的配置类的代码吧
@ComponentScans({@ComponentScan("com.aop.*"),@ComponentScan("com.aop")}) @EnableTransactionManagement public class AopConfig { //将事务管理器注入到ioc容器中 @Bean public TransactionManager transactionManager(){ DataSource dataSource = MyBatisConfig.getDataSource(); return new DataSourceTransactionManager(dataSource); } }
这里我们多了一个注解:@EnableTransactionManagement!
这个注解是一个非常重要的注解!他跟我们前面所解析的@EnableAspectJAutoProxy是一个作用:为相关的组件创建代理对象,收集增将器(Advisor),为代理对象与advisor创建关联关系。
这一块我们就不一步一步去调试他的创建、收集过程了。类似我前面分析的aop去看就好。我们只来分析他的使用。
4、事务的使用在我们所需要使用事务的方法上标注@Transaction就可以了。注意:第三步所说的哪些都是基于@Transaction这个注解去做的。也就是收集@Transaction注解标注的方法,为其创建advisor,为所在的类创建代理对象等。
二、事务增强源码解析这里依旧是cglib代理对象,在第一个callback里面有我们的增强器,看图说话
这里就直接看代理对象的invoke方法了。注意:所有的增强器都是代理对象,调用增强方法都是调用代理对象的invoke方法!!!
事务代理方法的执行
TransactionInterceptor 46 line @Nullable public Object invoke(MethodInvocation invocation) throws Throwable { //获取目标类的信息 也就是添加的事务的类 Class> targetClass = invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null; //获取目标方法 Method var10001 = invocation.getMethod(); invocation.getClass(); //事务的处理 return this.invokeWithinTransaction(var10001, targetClass, invocation::proceed); }
事务处理
TransactionAspectSupport 140 line protected Object invokeWithinTransaction(Method method, @Nullable Class> targetClass, TransactionAspectSupport.InvocationCallback invocation) throws Throwable { //获取事务的属性(这里面规定了只能是public!) TransactionAttributeSource tas = this.getTransactionAttributeSource(); //校验方法是否是公共的,并且获取事务相关规则(传播机制、异常返回等) TransactionAttribute txAttr = tas != null ? tas.getTransactionAttribute(method, targetClass) : null; //获取事务管理器 TransactionManager tm = this.determineTransactionManager(txAttr); //我们这一块的if判断是不通过的 看else if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) { TransactionAspectSupport.ReactiveTransactionSupport txSupport = (TransactionAspectSupport.ReactiveTransactionSupport)this.transactionSupportCache.computeIfAbsent(method, (key) -> { if (KotlinDetector.isKotlinType(method.getDeclaringClass()) && TransactionAspectSupport.KotlinDelegate.isSuspend(method)) { throw new TransactionUsageException("Unsupported annotated transaction on suspending function detected: " + method + ". Use TransactionalOperator.transactional extensions instead."); } else { ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(method.getReturnType()); if (adapter == null) { throw new IllegalStateException("Cannot apply reactive transaction to non-reactive return type: " + method.getReturnType()); } else { return new TransactionAspectSupport.ReactiveTransactionSupport(adapter); } } }); return txSupport.invokeWithinTransaction(method, targetClass, invocation, txAttr, (ReactiveTransactionManager)tm); } else { //将事务管理器强转下 PlatformTransactionManager ptm = this.asPlatformTransactionManager(tm); //这里拿到的是标注事务的方法 String joinpointIdentification = this.methodIdentification(method, targetClass, txAttr); //事务管理器不属于CallbackPreferringPlatformTransactionManager类型,直接走else if (txAttr != null && ptm instanceof CallbackPreferringPlatformTransactionManager) { TransactionAspectSupport.ThrowableHolder throwableHolder = new TransactionAspectSupport.ThrowableHolder(); Object result; try { result = ((CallbackPreferringPlatformTransactionManager)ptm).execute(txAttr, (statusx) -> { TransactionAspectSupport.TransactionInfo txInfo = this.prepareTransactionInfo(ptm, txAttr, joinpointIdentification, statusx); Object var9; try { Object retVal = invocation.proceedWithInvocation(); if (retVal != null && vavrPresent && TransactionAspectSupport.VavrDelegate.isVavrTry(retVal)) { retVal = TransactionAspectSupport.VavrDelegate.evaluateTryFailure(retVal, txAttr, statusx); } var9 = retVal; return var9; } catch (Throwable var13) { if (txAttr.rollbackOn(var13)) { if (var13 instanceof RuntimeException) { throw (RuntimeException)var13; } throw new TransactionAspectSupport.ThrowableHolderException(var13); } throwableHolder.throwable = var13; var9 = null; } finally { this.cleanupTransactionInfo(txInfo); } return var9; }); } catch (TransactionAspectSupport.ThrowableHolderException var20) { throw var20.getCause(); } catch (TransactionSystemException var21) { if (throwableHolder.throwable != null) { this.logger.error("Application exception overridden by commit exception", throwableHolder.throwable); var21.initApplicationException(throwableHolder.throwable); } throw var21; } catch (Throwable var22) { if (throwableHolder.throwable != null) { this.logger.error("Application exception overridden by commit exception", throwableHolder.throwable); } throw var22; } if (throwableHolder.throwable != null) { throw throwableHolder.throwable; } else { return result; } } else { //走这里,获取事务相关信息 看2.1 TransactionAspectSupport.TransactionInfo txInfo = this.createTransactionIfNecessary(ptm, txAttr, joinpointIdentification); Object retVal; try { //执行真正的业务逻辑 也就是我们的service的方法 并且拿到返回值 retVal = invocation.proceedWithInvocation(); } catch (Throwable var18) { //执行出异常 会走这一块哦 看2.3 this.completeTransactionAfterThrowing(txInfo, var18); throw var18; } finally { //清空下info数据 也就是将threadlocal中保存的线程设置为null this.cleanupTransactionInfo(txInfo); } if (retVal != null && vavrPresent && TransactionAspectSupport.VavrDelegate.isVavrTry(retVal)) { TransactionStatus status = txInfo.getTransactionStatus(); if (status != null && txAttr != null) { retVal = TransactionAspectSupport.VavrDelegate.evaluateTryFailure(retVal, txAttr, status); } } //进行事务的提交 2.4 this.commitTransactionAfterReturning(txInfo); return retVal; } } }2.1 获取事务相关信息
protected TransactionAspectSupport.TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm, @Nullable TransactionAttribute txAttr, final String joinpointIdentification) { //包装下 if (txAttr != null && ((TransactionAttribute)txAttr).getName() == null) { txAttr = new DelegatingTransactionAttribute((TransactionAttribute)txAttr) { public String getName() { return joinpointIdentification; } }; } TransactionStatus status = null; if (txAttr != null) { if (tm != null) { //获取事务的状态 看2.2 status = tm.getTransaction((TransactionDefinition)txAttr); } else if (this.logger.isDebugEnabled()) { this.logger.debug("Skipping transactional joinpoint [" + joinpointIdentification + "] because no transaction manager has been configured"); } } //创建出事务info对象 这里就是简单的对象创建 不存在其他东西的处理 return this.prepareTransactionInfo(tm, (TransactionAttribute)txAttr, joinpointIdentification, status); }2.2 获取事务的状态
AbstractPlatformTransactionManager 108 line public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException { TransactionDefinition def = definition != null ? definition : TransactionDefinition.withDefaults(); Object transaction = this.doGetTransaction(); boolean debugEnabled = this.logger.isDebugEnabled(); //是否已经存在事务 if (this.isExistingTransaction(transaction)) { //走已有事务的逻辑 todo 后面分析 return this.handleExistingTransaction(def, transaction, debugEnabled); } else if (def.getTimeout() < -1) { throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout()); } else if (def.getPropagationBehavior() == 2) { throw new IllegalTransactionStateException("No existing transaction found for transaction marked with propagation 'mandatory'"); //这里的三个状态 0 3 6 代表的是什么?? 后面有时间了再分析吧 } else if (def.getPropagationBehavior() != 0 && def.getPropagationBehavior() != 3 && def.getPropagationBehavior() != 6) { if (def.getIsolationLevel() != -1 && this.logger.isWarnEnabled()) { this.logger.warn("Custom isolation level specified but no actual transaction initiated; isolation level will effectively be ignored: " + def); } boolean newSynchronization = this.getTransactionSynchronization() == 0; return this.prepareTransactionStatus(def, (Object)null, true, newSynchronization, debugEnabled, (Object)null); } else { //进入else 看这里 AbstractPlatformTransactionManager.SuspendedResourcesHolder suspendedResources = this.suspend((Object)null); try { //开始事务 看 2.2.1 return this.startTransaction(def, transaction, debugEnabled, suspendedResources); } catch (Error | RuntimeException var7) { this.resume((Object)null, suspendedResources); throw var7; } } }
2.2.1开启事务
AbstractPlatformTransactionManager 140 line private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction, boolean debugEnabled, @Nullable AbstractPlatformTransactionManager.SuspendedResourcesHolder suspendedResources) { boolean newSynchronization = this.getTransactionSynchronization() != 2; //初始化一个status DefaultTransactionStatus status = this.newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); //这里面做开始事务的准备工作 其中有一点: 执行了这个sql:SET TRANSACTION READ ONLY this.doBegin(transaction, definition); //设置相关状态等属性值 this.prepareSynchronization(status, definition); return status; }2.3 异常下的处理
TransactionAspectSupport 358 line protected void completeTransactionAfterThrowing(@Nullable TransactionAspectSupport.TransactionInfo txInfo, Throwable ex) { if (txInfo != null && txInfo.getTransactionStatus() != null) { //判断回滚的异常是不是我们出现的这个异常 if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) { try { //回滚 看2.3.1 txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus()); } catch (TransactionSystemException var6) { this.logger.error("Application exception overridden by rollback exception", ex); var6.initApplicationException(ex); throw var6; } catch (Error | RuntimeException var7) { this.logger.error("Application exception overridden by rollback exception", ex); throw var7; } } else { try { //不是的话进行事务提交 有异常往上抛 txInfo.getTransactionManager().commit(txInfo.getTransactionStatus()); } catch (TransactionSystemException var4) { this.logger.error("Application exception overridden by commit exception", ex); var4.initApplicationException(ex); throw var4; } catch (Error | RuntimeException var5) { this.logger.error("Application exception overridden by commit exception", ex); throw var5; } } } }2.3.1 处理事务回滚
private void processRollback(DefaultTransactionStatus status, boolean unexpected) { try { boolean unexpectedRollback = unexpected; try { this.triggerBeforeCompletion(status); if (status.hasSavepoint()) { if (status.isDebug()) { this.logger.debug("Rolling back transaction to savepoint"); } status.rollbackToHeldSavepoint(); } else if (status.isNewTransaction()) { if (status.isDebug()) { this.logger.debug("Initiating transaction rollback"); } //获取连接 调用 con.rollback(); this.doRollback(status); } else { if (status.hasTransaction()) { if (!status.isLocalRollbackOnly() && !this.isGlobalRollbackOnParticipationFailure()) { if (status.isDebug()) { this.logger.debug("Participating transaction failed - letting transaction originator decide on rollback"); } } else { if (status.isDebug()) { this.logger.debug("Participating transaction failed - marking existing transaction as rollback-only"); } this.doSetRollbackOnly(status); } } else { this.logger.debug("Should roll back transaction but cannot - no transaction available"); } if (!this.isFailEarlyOnGlobalRollbackOnly()) { unexpectedRollback = false; } } } catch (Error | RuntimeException var8) { this.triggerAfterCompletion(status, 2); throw var8; } this.triggerAfterCompletion(status, 1); if (unexpectedRollback) { throw new UnexpectedRollbackException("Transaction rolled back because it has been marked as rollback-only"); } } finally { this.cleanupAfterCompletion(status); } }2.4 事务的提交
TransactionAspectSupport 349 line protected void commitTransactionAfterReturning(@Nullable TransactionAspectSupport.TransactionInfo txInfo) { if (txInfo != null && txInfo.getTransactionStatus() != null) { //提交 提交的时候会再次判断是否需要回滚,需要的话走上面的回滚逻辑 //不需要 获取连接 调con.commit(); txInfo.getTransactionManager().commit(txInfo.getTransactionStatus()); } }
这一块事务看的很粗糙 很粗糙 很粗糙!! 后面有时间再仔细分析把~~~
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)