Seata服务端(TC)源码分析

Seata服务端(TC)源码分析,第1张

Seata服务端(TC)源码分析 1. TC介绍

Seata是由TM,RM,TC。 其中TM和RM是在客户端,TC是事务协调者是需要单独部署的服务端。

  • TC (Transaction Coordinator) - 事务协调者 维护全局和分支事务的状态,驱动全局事务提交或回滚。
  • TM (Transaction Manager) - 事务管理器(发起者,同时也是RM的一种) 定义全局事务的范围:开始全局事务、提交或回滚全局事务。
  • RM (Resource Manager) - 资源管理器(每个参与事务的微服务) 管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。
2. TC服务入口

TC是一个Spring Boot应用,单独部署的,所以和其他的springboot应用一样,会有一个main函数作为启动入口。

这里我们看Server.java这里就是启动入口,在这个入口中找到协调者,因为TC整体的 *** 作就是协调整体的全局事务

public static void main(String[] args) throws IOException {
    // 通过线程池去接收请求
    ThreadPoolExecutor workingThreads = new ThreadPoolExecutor(NettyServerConfig.getMinServerPoolSize(),
            NettyServerConfig.getMaxServerPoolSize(), NettyServerConfig.getKeepAliveTime(), TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()),
            new NamedThreadFactory("ServerHandlerThread", NettyServerConfig.getMaxServerPoolSize()),
             new ThreadPoolExecutor.CallerRunsPolicy());
    //通过Netty去接收请求
    NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads);
    ...
    //通过DefaultCoordinator来处理各种请求
    DefaultCoordinator coordinator = new DefaultCoordinator(nettyRemotingServer);
    ...
    //初始化Server
    nettyRemotingServer.init();
    ...
    //主线程不退出
    System.exit(0);
}
3. 事务协调器

从代码看出DefaultCoordinator就是事务协调器了,也是用来处理各种事务请求的

public class DefaultCoordinator extends AbstractTCInboundHandler implements TransactionMessageHandler, Disposable {
    ...
    @Override
    protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)
        throws TransactionException {
        ...
        //开始全局事务
    }
    @Override
    protected void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext)
        throws TransactionException {
        //提交全局事务
    }
    @Override
    protected void doGlobalRollback(GlobalRollbackRequest request, GlobalRollbackResponse response,
                                    RpcContext rpcContext) throws TransactionException {
        //全局事务回滚
    }
    @Override
    protected void doGlobalStatus(GlobalStatusRequest request, GlobalStatusResponse response, RpcContext rpcContext)
        throws TransactionException {
        //全局事务状态
    }
    @Override
    protected void doGlobalReport(GlobalReportRequest request, GlobalReportResponse response, RpcContext rpcContext)
        throws TransactionException {
        //全局事务反馈
    }
    @Override
    protected void doBranchRegister(BranchRegisterRequest request, BranchRegisterResponse response,
                                    RpcContext rpcContext) throws TransactionException {
        //分支事务注册
    }
    @Override
    protected void doBranchReport(BranchReportRequest request, BranchReportResponse response, RpcContext rpcContext)
        throws TransactionException {
        //分支事务反馈
    }
    ...
    //下面还有很多请求处理的方法,就不一一列举了
  }
}

4. 创建全局事务

这里我们看一下doGlobalBegin方法是如何处理的

@Override
protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)
    throws TransactionException {
    response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),
        request.getTransactionName(), request.getTimeout()));
    ...
}

从代码可以看出,调用了core.begin来获得xid然后返回给客户端

那我们参考core.begin方法来看一下

// DefaultCore.java
@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
    throws TransactionException {
    // 创建全局事务Session
    GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name,
        timeout);
    MDC.put(RootContext.MDC_KEY_XID, session.getXid());
    // 为Session中添加回调监听 SessionHolder.getRootSessionManager()
    // 去获取一个全局Session管理器DataBaseSessionManager
    // 观察者设计模式
    session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());

    //全局事务开启
    session.begin();

    // transaction start event
    eventBus.post(new GlobalTransactionEvent(session.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
        session.getTransactionName(), applicationId, transactionServiceGroup, session.getBeginTime(), null, session.getStatus()));

    return session.getXid();
}

再向下我们要关注一下全局Session管理器DataBaseSessionManager,进入到getRootSessionManager()方法中

//SessionHolder.java
public static SessionManager getRootSessionManager() {
    if (ROOT_SESSION_MANAGER == null) {
        throw new ShouldNeverHappenException("SessionManager is NOT init!");
    }
    return ROOT_SESSION_MANAGER;
}

// SessionHolder.java
//这个管理器如何生成的那,我们可以看一下init初始化方法
public static void init(String mode) {
    ...
    StoreMode storeMode = StoreMode.get(mode);
    if (StoreMode.DB.equals(storeMode)) {
        // 通过SPI机制读取SessionManager接口实现类,读取的是META-INF.service目录
        // 在通过反射机制创建对象DataBaseSessionManager
        ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName());
        ...
    } else if (StoreMode.FILE.equals(storeMode)) {
        ...
    } else if (StoreMode.REDIS.equals(storeMode)) {
        ...
    } else {
        // unknown store
        ...
    }
    reload(storeMode);
}

从代码可以看出使用了观察者模式,当Session有变化时调用,从而触发去存储相关信息, SessionHolder.init方法是在main函数启动时执行的。如果是DB模式,是通过SPI去读取SessionManager的子类来处理的,默认情况下是:io.seata.server.storage.db.session.DataBaseSessionManager

接下来我们关注开启全局事务方法DefaultCore.begin方法中的 session.begin();

//GlobalSession.java
@Override
public void begin() throws TransactionException {
    // 声明全局事务开始
    this.status = GlobalStatus.Begin;
    // 开始时间
    this.beginTime = System.currentTimeMillis();
    // 激活全局事务
    this.active = true;
    // 将SessionManager放入到集合中,调用onBegin方法
    for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
        lifecycleListener.onBegin(this);
    }
}

这里就触发了监听器的onBegin方法,那对应的DataBaseSessionManager也会有相应的实现以响应,调用了onBegin方法后,会通过抽象实现掉到addGlobalSession方法

//DataBaseSessionManager.java
@Override
public void addGlobalSession(GlobalSession session) throws TransactionException {
    if (StringUtils.isBlank(taskName)) {
        // 保存global_session
        boolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_ADD, session);
        ...
    } else {
        // 更新global_session
        boolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_UPDATE, session);
        ...
    }
}

然后我们来看transactionStoreManager.writeSession方法是如何写入的

// DataBaseTransactionStoreManager.java
@Override
public boolean writeSession(LogOperation logOperation, SessionStorable session) {
    // 第一次进入一定是写入
    if (LogOperation.GLOBAL_ADD.equals(logOperation)) {
        return logStore.insertGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
    } else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) {
        return logStore.updateGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
    } else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) {
        return logStore.deleteGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
    } else if (LogOperation.BRANCH_ADD.equals(logOperation)) {
        return logStore.insertBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
    } else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) {
        return logStore.updateBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
    } else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) {
        return logStore.deleteBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
    } else {
        throw new StoreException("Unknown LogOperation:" + logOperation.name());
    }
}

因为我们第一次调用一定是写入,所以此时我们应该查看insertGlobalTransactionDO,此方法的作用就是写入全局事务表中global_table

// LogStoreDataBaseDAO.java
@Override
public boolean insertGlobalTransactionDO(GlobalTransactionDO globalTransactionDO) {
    String sql = LogStoreSqlsFactory.getLogStoreSqls(dbType).getInsertGlobalTransactionSQL(globalTable);
    Connection conn = null;
    PreparedStatement ps = null;
    try {
        conn = logStoreDataSource.getConnection();
        conn.setAutoCommit(true);
        ps = conn.prepareStatement(sql);
        ps.setString(1, globalTransactionDO.getXid());
        ps.setLong(2, globalTransactionDO.getTransactionId());
        ps.setInt(3, globalTransactionDO.getStatus());
        ps.setString(4, globalTransactionDO.getApplicationId());
        ps.setString(5, globalTransactionDO.getTransactionServiceGroup());
        String transactionName = globalTransactionDO.getTransactionName();
        transactionName = transactionName.length() > transactionNameColumnSize ? transactionName.substring(0,
                                                                                                           transactionNameColumnSize) : transactionName;
        ps.setString(6, transactionName);
        ps.setInt(7, globalTransactionDO.getTimeout());
        ps.setLong(8, globalTransactionDO.getBeginTime());
        ps.setString(9, globalTransactionDO.getApplicationData());
        return ps.executeUpdate() > 0;
    } catch (SQLException e) {
        throw new StoreException(e);
    } finally {
        IOUtil.close(ps, conn);
    }
}

我们可以查看GlobalTransactionDO实体类的属性,和global_table 的字段进行比对,就能看出开启全局事务以后,把全局事务作为记录写入到了global_table表中了。

5. 图解调用流程

Seata-源码分析图 | ProcessOn免费在线作图,在线流程图,在线思维导图 |

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/741468.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-28
下一篇 2022-04-28

发表评论

登录后才能评论

评论列表(0条)

保存