- TC:事务管理器
- TM:事务开始服务
- RM:事务参与过程的其他服务
底层实现流程
- TM和RM都连接到我们的事务协调器TC。
- TM和RM服务的数据源都被Seata代理,执行语句的前后会保存两条记录,一条是执行前的记录,一条是执行后的记录,是方便后期可以逆向的生成sql去回滚事务。具体记录存放在seata的undo_log表中。
- TM在使用RPC-Feign远程调用的时候,在ThreadLocal中获取xid。
- RM在请求头中取到该xid,设置到threadLocal中,同时向seata注册本地事务。
- TM将当前本地事务的结果发送给TC,TC最后通知所有分支事务是提交或回滚。
- TM如果调用接口成功之后再抛出异常时,告诉协调者TC,协调者TC再通知到所有分支的事务,根据undo_log逆向回滚事务。
- 最后如果没有任何异常,TM通知TC,TC最后让所有的有该全局xid的undo_log中的记录都删除。
基本实现思路一样,唯一区别在于回滚方式,LCN采用代理数据源加关闭连接,暂时不支持提交本地事务,容易造成数据的死锁。
Seata采用undo_log的形式逆向生成sql,去实现回滚。
核心注解在于@GlobalTransactional
引入seata核心依赖
com.alibaba.cloud >spring-cloud-starter-alibaba-seata2.1.1.RELEASE
先看到spring.factories配置文件
看到com.alibaba.cloud.seata.GlobalTransactionAutoConfiguration,
这个类
点进去,可以看到,这就是初始化seata的一个配置类。
并且注入全局事务扫描器,如下代码
@Bean public GlobalTransactionScanner globalTransactionScanner() { String applicationName = applicationContext.getEnvironment() .getProperty("spring.application.name"); String txServiceGroup = seataProperties.getTxServiceGroup(); if (StringUtils.isEmpty(txServiceGroup)) { txServiceGroup = applicationName + "-seata-service-group"; seataProperties.setTxServiceGroup(txServiceGroup); } return new GlobalTransactionScanner(applicationName, txServiceGroup); }
点到全局事务扫描类中
public class GlobalTransactionScanner extends AbstractAutoProxyCreator
implements InitializingBean, ApplicationContextAware,
DisposableBean
可以看到全局事务扫描类继承/实现了以下的接口,那么分别是做啥用的呢?
首先我们根据SpringBean和Aop的源码不难发现
AbstractAutoProxyCreator就是Aop原生的一个代理类
而InitializingBean属于Spring的Bean中的一个注解,执行动作在Bean实例化之后执行。
因此对于这个接口的回调方法中,就实现初始化的动作
源码如下
@Override public void afterPropertiesSet() { if (disableGlobalTransaction) { if (LOGGER.isInfoEnabled()) { LOGGER.info("Global transaction is disabled."); } return; } initClient(); } private void initClient() { if (LOGGER.isInfoEnabled()) { LOGGER.info("Initializing Global Transaction Clients ... "); } if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) { throw new IllegalArgumentException( "applicationId: " + applicationId + ", txServiceGroup: " + txServiceGroup); } //init TM TMClient.init(applicationId, txServiceGroup); if (LOGGER.isInfoEnabled()) { LOGGER.info( "Transaction Manager Client is initialized. applicationId[" + applicationId + "] txServiceGroup[" + txServiceGroup + "]"); } //init RM RMClient.init(applicationId, txServiceGroup); if (LOGGER.isInfoEnabled()) { LOGGER.info( "Resource Manager is initialized. applicationId[" + applicationId + "] txServiceGroup[" + txServiceGroup + "]"); } if (LOGGER.isInfoEnabled()) { LOGGER.info("Global Transaction Clients are initialized. "); } registerSpringShutdownHook(); }
然后看到可以发现这两行代码
//init TM TMClient.init(applicationId, txServiceGroup); //init RM RMClient.init(applicationId, txServiceGroup);
可以看到,在初始化方法中,就实现了初始化RM、TM然后,注册到TC中。
然后AbstractAutoProxyCreator这个抽象模板又是拿来干啥的呢?
对比下抽象类和这个实现类可以看到,主要是
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey)
重写了父类的这个回调方法。
看到实现类这个方法,
其作用就是,加上@GlobalTransactional这个注解后,当aop创建代理对象的时候,会走这个回调方法,去创建出GlobalTransactionalInterceptor对象。
点进去这个对象发现,哎,实现了MethodInterceptor接口,说明其采用CGLIB代理模式,进行代理,从而进行方法的反射。
Invoke()中就是其会执行的反射方法
具体源码如下
@Override public Object invoke(final MethodInvocation methodInvocation) throws Throwable { Class> targetClass = (methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null); Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass); final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod); final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, GlobalTransactional.class); final GlobalLock globalLockAnnotation = getAnnotation(method, GlobalLock.class); if (globalTransactionalAnnotation != null) { return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation); } else if (globalLockAnnotation != null) { return handleGlobalLock(methodInvocation); } else { return methodInvocation.proceed(); } }
其中获取到两个注解
- GlobalTransactional,判断你方法上有没有加上这个全局事务的注解
- GlobalLock,作为分布式锁
因此直接点到handleGlobalTransaction()方法中
查看return transactionalTemplate.execute()这个execute方法
具体源码如下
public Object execute(TransactionalExecutor business) throws Throwable { // 1. get or create a transaction GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate(); // 1.1 get transactionInfo TransactionInfo txInfo = business.getTransactionInfo(); if (txInfo == null) { throw new ShouldNeverHappenException("transactionInfo does not exist"); } try { // 2. begin transaction beginTransaction(txInfo, tx); Object rs = null; try { // Do Your Business rs = business.execute(); } catch (Throwable ex) { // 3.the needed business exception to rollback. completeTransactionAfterThrowing(txInfo,tx,ex); throw ex; } // 4. everything is fine, commit. commitTransaction(tx); return rs; } finally { //5. clear triggerAfterCompletion(); cleanUp(); } }
先看第一行代码
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate(); 点进去看一下 private static GlobalTransaction getCurrent() { String xid = RootContext.getXID(); if (xid == null) { return null; } return new DefaultGlobalTransaction(xid, GlobalStatus.Begin, GlobalTransactionRole.Participant); }
这里,也就是获取到我们一个分布式事务协调器的一个全局id,那是从哪里获取来的呢?
继续点到getXid()
一直点到最后
@LoadLevel(name = "ThreadLocalContextCore", order = Integer.MIN_VALUE) public class ThreadLocalContextCore implements ContextCore { private ThreadLocal
发现,哎,原来是从threadLocal里面获取到这样的一个全局id。
那么如何创建的呢
往回看到
public static GlobalTransaction getCurrentOrCreate() { GlobalTransaction tx = getCurrent(); if (tx == null) { return createNew(); } return tx; } //如果第一次为空时,就走createNew()这个方法去创建,再点到构造方法 DefaultGlobalTransaction() { this(null, GlobalStatus.UnKnown, GlobalTransactionRole.Launcher); }
所以此时并不会去设置xid,而是先置空,等待我们的协调器TC,给我们分发全局XID。
代码继续往下看
TransactionInfo txInfo = business.getTransactionInfo();
这里是拿到我们具体事务的某个方法。
继续往下到
beginTransaction(txInfo, tx);
这个方法中,具体源码如下
private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException { try { triggerBeforeBegin(); tx.begin(txInfo.getTimeOut(), txInfo.getName()); triggerAfterBegin(); } catch (TransactionException txe) { throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.BeginFailure); } }
首先tx.begin(txInfo.getTimeOut(), txInfo.getName());为开启事务。
其begin方法如下
@Override public void begin(int timeout, String name) throws TransactionException { if (role != GlobalTransactionRole.Launcher) { check(); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Ignore Begin(): just involved in global transaction [" + xid + "]"); } return; } if (xid != null) { throw new IllegalStateException(); } if (RootContext.getXID() != null) { throw new IllegalStateException(); } xid = transactionManager.begin(null, null, name, timeout); status = GlobalStatus.Begin; RootContext.bind(xid); if (LOGGER.isInfoEnabled()) { LOGGER.info("Begin new global transaction [" + xid + "]"); } }
核心创建xid逻辑为
xid = transactionManager.begin(null, null, name, timeout); status = GlobalStatus.Begin; RootContext.bind(xid);
再次之前,必须判断到threadLocal里面的xid非空,不然就直接报错。
然后再连接到TC,去创建xid。
这里面begin源码在点进去看
@Override public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException { GlobalBeginRequest request = new GlobalBeginRequest(); request.setTransactionName(name); request.setTimeout(timeout); GlobalBeginResponse response = (GlobalBeginResponse)syncCall(request); if (response.getResultCode() == ResultCode.Failed) { throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg()); } return response.getXid(); }
然后发现,在这里就实现到了,发送请求到TC,去请求获取到xid。
最后请求成功拿到xid后通过
RootContext.bind(xid);把XID设置到threadLocal里面。
到这里就完成了Seata的获取xid过程。
实现类在io.seata.rm.datasource.exec. AbstractDMLbaseExecutor
看到doExecute方法
@Override public T doExecute(Object... args) throws Throwable { AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy(); if (connectionProxy.getAutoCommit()) { return executeAutoCommitTrue(args); } else { return executeAutoCommitFalse(args); } }
点到executeAutoCommitFalse()方法中
protected T executeAutoCommitFalse(Object[] args) throws Exception { TableRecords beforeImage = beforeImage(); T result = statementCallback.execute(statementProxy.getTargetStatement(), args); TableRecords afterImage = afterImage(beforeImage); prepareUndoLog(beforeImage, afterImage); return result; }
beforeImage和afterImage也就是生成前置和后置镜像,用于反向生成sql语句。
在executeAutoCommitTrue()方法中把自动提交事务设置为了false。因此此时事务不会立马提交。防止事务成功,但是undo_log日志的事务失败,必须保证这两者的数据最终一致性,所以事务不能立马提交,而是先设置为False。
具体源码如下
protected T executeAutoCommitTrue(Object[] args) throws Throwable { AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy(); try { connectionProxy.setAutoCommit(false); return new LockRetryPolicy(connectionProxy.getTargetConnection()).execute(() -> { T result = executeAutoCommitFalse(args); connectionProxy.commit(); return result; }); } catch (Exception e) { // when exception occur in finally,this exception will lost, so just print it here LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e); if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) { connectionProxy.getTargetConnection().rollback(); } throw e; } finally { ((ConnectionProxy) connectionProxy).getContext().reset(); connectionProxy.setAutoCommit(true); } }
此时设置False,然后在去执行executeAutoCommitFalse()方法,去把undo_log内容记录,完成最后才执行:connectionProxy.commit();去提交事务。
connectionProxy.setAutoCommit(false); return new LockRetryPolicy(connectionProxy.getTargetConnection()).execute(() -> { T result = executeAutoCommitFalse(args); connectionProxy.commit(); return result; });
然后看到executeAutoCommitFalse方法中的prepareUndolog方法
protected void prepareUndoLog(TableRecords beforeImage, TableRecords afterImage) throws SQLException { if (beforeImage.getRows().size() == 0 && afterImage.getRows().size() == 0) { return; } ConnectionProxy connectionProxy = statementProxy.getConnectionProxy(); TableRecords lockKeyRecords = sqlRecognizer.getSQLType() == SQLType.DELETe ? beforeImage : afterImage; String lockKeys = buildLockKey(lockKeyRecords); connectionProxy.appendLockKey(lockKeys); SQLUndoLog sqlUndoLog = buildUndoItem(beforeImage, afterImage); connectionProxy.appendUndoLog(sqlUndoLog); }
在SQLUndoLog sqlUndoLog = buildUndoItem(beforeImage, afterImage);这里就是记录到undo_log表中事务信息的动作。
因此到现在seata就完成了,前置后置副本在undo_log表中数据的记录。
Seata底层逆向生成sql回滚事务源码如果发起全局事务的服务没有报错的情况下,通知TC协调器,把undo_log这条记录删除,如果失败的话,则查询本地数据库undo_log表,进行逆向生成sql回滚事务。
具体方法在DataSourceManager#branchRollback()方法中
执行成功,删除undolog日志,源码位置
io.seata.rm.datasource.undo. AbstractUndoLogManager # batchDeleteUndoLog()
@Override public void batchDeleteUndoLog(Setxids, Set branchIds, Connection conn) throws SQLException { if (CollectionUtils.isEmpty(xids) || CollectionUtils.isEmpty(branchIds)) { return; } int xidSize = xids.size(); int branchIdSize = branchIds.size(); String batchDeleteSql = toBatchDeleteUndoLogSql(xidSize, branchIdSize); PreparedStatement deletePST = null; try { deletePST = conn.prepareStatement(batchDeleteSql); int paramsIndex = 1; for (Long branchId : branchIds) { deletePST.setLong(paramsIndex++,branchId); } for (String xid: xids){ deletePST.setString(paramsIndex++, xid); } int deleteRows = deletePST.executeUpdate(); if (LOGGER.isDebugEnabled()) { LOGGER.debug("batch delete undo log size " + deleteRows); } }catch (Exception e){ if (!(e instanceof SQLException)) { e = new SQLException(e); } throw (SQLException) e; } finally { if (deletePST != null) { deletePST.close(); } } }
String batchDeleteSql = toBatchDeleteUndoLogSql(xidSize, branchIdSize);
这里,就是生成删除undo_log的sql语句
具体实现如下
protected static String toBatchDeleteUndoLogSql(int xidSize, int branchIdSize) { StringBuilder sqlBuilder = new StringBuilder(64); sqlBuilder.append("DELETE FROM ") .append(UNDO_LOG_TABLE_NAME) .append(" WHERe " + ClientTableColumnsName.UNDO_LOG_BRANCH_XID + " IN "); appendInParam(branchIdSize, sqlBuilder); sqlBuilder.append(" AND " + ClientTableColumnsName.UNDO_LOG_XID + " IN "); appendInParam(xidSize, sqlBuilder); return sqlBuilder.toString(); }
这就是服务没有发生报错,数据正常情况,删除undo_log表的记录
如果报错的情况,回滚逻辑源码如下
protected void doBranchRollback(BranchRollbackRequest request, BranchRollbackResponse response) throws TransactionException { String xid = request.getXid(); long branchId = request.getBranchId(); String resourceId = request.getResourceId(); String applicationData = request.getApplicationData(); if (LOGGER.isInfoEnabled()) { LOGGER.info("Branch Rollbacking: " + xid + " " + branchId + " " + resourceId); } BranchStatus status = getResourceManager().branchRollback(request.getBranchType(), xid, branchId, resourceId, applicationData); response.setXid(xid); response.setBranchId(branchId); response.setBranchStatus(status); if (LOGGER.isInfoEnabled()) { LOGGER.info("Branch Rollbacked result: " + status); } }
具体实现是在:
BranchStatus status = getResourceManager().branchRollback(request.getBranchType(), xid, branchId, resourceId,applicationData);
找到DataSourceManager实现类:io.seata.rm.datasource.undo. AbstractUndoLogManager # batchDeleteUndoLog()
源码如下
@Override public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException { DataSourceProxy dataSourceProxy = get(resourceId); if (dataSourceProxy == null) { throw new ShouldNeverHappenException(); } try { UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId); } catch (TransactionException te) { if (LOGGER.isInfoEnabled()) { LOGGER.info("branchRollback failed reason [{}]", te.getMessage()); } if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) { return BranchStatus.PhaseTwo_RollbackFailed_Unretryable; } else { return BranchStatus.PhaseTwo_RollbackFailed_Retryable; } } return BranchStatus.PhaseTwo_Rollbacked; }
这里最主要看到
UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId);
核心回滚是在undo()方法
先看到
查询方法
selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL);
具体值定义如下
protected static final String SELECT_UNDO_LOG_SQL = "SELECT * FROM " + UNDO_LOG_TABLE_NAME + " WHERe " + ClientTableColumnsName.UNDO_LOG_BRANCH_XID + " = ? AND " + ClientTableColumnsName.UNDO_LOG_XID + " = ? FOR UPDATE";
回滚的具体源码如下
for (SQLUndoLog sqlUndoLog : sqlUndoLogs) { Tablemeta tablemeta = TablemetaCacheFactory.getTablemetaCache(dataSourceProxy).getTablemeta(dataSourceProxy, sqlUndoLog.getTableName()); sqlUndoLog.setTablemeta(tablemeta); AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor( dataSourceProxy.getDbType(), sqlUndoLog); undoExecutor.executeOn(conn); }
找到sqlUndoLogs列表,然后做遍历,进行逆向生成SQL回滚事务
其中,逆向回滚源码如下
public void executeOn(Connection conn) throws SQLException { if (IS_UNDO_DATA_VALIDATION_ENABLE && !dataValidationAndGoOn(conn)) { return; } try { String undoSQL = buildUndoSQL(); PreparedStatement undoPST = conn.prepareStatement(undoSQL); TableRecords undoRows = getUndoRows(); for (Row undoRow : undoRows.getRows()) { ArrayListundoValues = new ArrayList<>(); Field pkValue = null; for (Field field : undoRow.getFields()) { if (field.getKeyType() == KeyType.PrimaryKey) { pkValue = field; } else { undoValues.add(field); } } undoPrepare(undoPST, undoValues, pkValue); undoPST.executeUpdate(); } } catch (Exception ex) { if (ex instanceof SQLException) { throw (SQLException) ex; } else { throw new SQLException(ex); } } }
看到这个代码
String undoSQL = buildUndoSQL();
这里生成出来的字符串就是逆向生成的SQL语句,如果之前是insert语句,那么这里就是delete语句。
所以这就是Seata逆向回滚的大致原理。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)