Seata是由TM,RM,TC。 其中TM和RM是在客户端,TC是事务协调者是需要单独部署的服务端。
- TC (Transaction Coordinator) - 事务协调者 维护全局和分支事务的状态,驱动全局事务提交或回滚。
- TM (Transaction Manager) - 事务管理器(发起者,同时也是RM的一种) 定义全局事务的范围:开始全局事务、提交或回滚全局事务。
- RM (Resource Manager) - 资源管理器(每个参与事务的微服务) 管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。
TC是一个Spring Boot应用,单独部署的,所以和其他的springboot应用一样,会有一个main函数作为启动入口。
这里我们看Server.java这里就是启动入口,在这个入口中找到协调者,因为TC整体的 *** 作就是协调整体的全局事务
public static void main(String[] args) throws IOException {
// 通过线程池去接收请求
ThreadPoolExecutor workingThreads = new ThreadPoolExecutor(NettyServerConfig.getMinServerPoolSize(),
NettyServerConfig.getMaxServerPoolSize(), NettyServerConfig.getKeepAliveTime(), TimeUnit.SECONDS,
new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()),
new NamedThreadFactory("ServerHandlerThread", NettyServerConfig.getMaxServerPoolSize()),
new ThreadPoolExecutor.CallerRunsPolicy());
//通过Netty去接收请求
NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads);
...
//通过DefaultCoordinator来处理各种请求
DefaultCoordinator coordinator = new DefaultCoordinator(nettyRemotingServer);
...
//初始化Server
nettyRemotingServer.init();
...
//主线程不退出
System.exit(0);
}
3. 事务协调器
从代码看出DefaultCoordinator就是事务协调器了,也是用来处理各种事务请求的
public class DefaultCoordinator extends AbstractTCInboundHandler implements TransactionMessageHandler, Disposable {
...
@Override
protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)
throws TransactionException {
...
//开始全局事务
}
@Override
protected void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext)
throws TransactionException {
//提交全局事务
}
@Override
protected void doGlobalRollback(GlobalRollbackRequest request, GlobalRollbackResponse response,
RpcContext rpcContext) throws TransactionException {
//全局事务回滚
}
@Override
protected void doGlobalStatus(GlobalStatusRequest request, GlobalStatusResponse response, RpcContext rpcContext)
throws TransactionException {
//全局事务状态
}
@Override
protected void doGlobalReport(GlobalReportRequest request, GlobalReportResponse response, RpcContext rpcContext)
throws TransactionException {
//全局事务反馈
}
@Override
protected void doBranchRegister(BranchRegisterRequest request, BranchRegisterResponse response,
RpcContext rpcContext) throws TransactionException {
//分支事务注册
}
@Override
protected void doBranchReport(BranchReportRequest request, BranchReportResponse response, RpcContext rpcContext)
throws TransactionException {
//分支事务反馈
}
...
//下面还有很多请求处理的方法,就不一一列举了
}
}
4. 创建全局事务
这里我们看一下doGlobalBegin方法是如何处理的
@Override
protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)
throws TransactionException {
response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),
request.getTransactionName(), request.getTimeout()));
...
}
从代码可以看出,调用了core.begin来获得xid然后返回给客户端
那我们参考core.begin方法来看一下
// DefaultCore.java
@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
throws TransactionException {
// 创建全局事务Session
GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name,
timeout);
MDC.put(RootContext.MDC_KEY_XID, session.getXid());
// 为Session中添加回调监听 SessionHolder.getRootSessionManager()
// 去获取一个全局Session管理器DataBaseSessionManager
// 观察者设计模式
session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
//全局事务开启
session.begin();
// transaction start event
eventBus.post(new GlobalTransactionEvent(session.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
session.getTransactionName(), applicationId, transactionServiceGroup, session.getBeginTime(), null, session.getStatus()));
return session.getXid();
}
再向下我们要关注一下全局Session管理器DataBaseSessionManager,进入到getRootSessionManager()方法中
//SessionHolder.java
public static SessionManager getRootSessionManager() {
if (ROOT_SESSION_MANAGER == null) {
throw new ShouldNeverHappenException("SessionManager is NOT init!");
}
return ROOT_SESSION_MANAGER;
}
// SessionHolder.java
//这个管理器如何生成的那,我们可以看一下init初始化方法
public static void init(String mode) {
...
StoreMode storeMode = StoreMode.get(mode);
if (StoreMode.DB.equals(storeMode)) {
// 通过SPI机制读取SessionManager接口实现类,读取的是META-INF.service目录
// 在通过反射机制创建对象DataBaseSessionManager
ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName());
...
} else if (StoreMode.FILE.equals(storeMode)) {
...
} else if (StoreMode.REDIS.equals(storeMode)) {
...
} else {
// unknown store
...
}
reload(storeMode);
}
从代码可以看出使用了观察者模式,当Session有变化时调用,从而触发去存储相关信息, SessionHolder.init方法是在main函数启动时执行的。如果是DB模式,是通过SPI去读取SessionManager的子类来处理的,默认情况下是:io.seata.server.storage.db.session.DataBaseSessionManager
接下来我们关注开启全局事务方法DefaultCore.begin方法中的 session.begin();
//GlobalSession.java
@Override
public void begin() throws TransactionException {
// 声明全局事务开始
this.status = GlobalStatus.Begin;
// 开始时间
this.beginTime = System.currentTimeMillis();
// 激活全局事务
this.active = true;
// 将SessionManager放入到集合中,调用onBegin方法
for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onBegin(this);
}
}
这里就触发了监听器的onBegin方法,那对应的DataBaseSessionManager也会有相应的实现以响应,调用了onBegin方法后,会通过抽象实现掉到addGlobalSession方法
//DataBaseSessionManager.java
@Override
public void addGlobalSession(GlobalSession session) throws TransactionException {
if (StringUtils.isBlank(taskName)) {
// 保存global_session
boolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_ADD, session);
...
} else {
// 更新global_session
boolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_UPDATE, session);
...
}
}
然后我们来看transactionStoreManager.writeSession方法是如何写入的
// DataBaseTransactionStoreManager.java
@Override
public boolean writeSession(LogOperation logOperation, SessionStorable session) {
// 第一次进入一定是写入
if (LogOperation.GLOBAL_ADD.equals(logOperation)) {
return logStore.insertGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
} else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) {
return logStore.updateGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
} else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) {
return logStore.deleteGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
} else if (LogOperation.BRANCH_ADD.equals(logOperation)) {
return logStore.insertBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
} else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) {
return logStore.updateBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
} else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) {
return logStore.deleteBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
} else {
throw new StoreException("Unknown LogOperation:" + logOperation.name());
}
}
因为我们第一次调用一定是写入,所以此时我们应该查看insertGlobalTransactionDO,此方法的作用就是写入全局事务表中global_table
// LogStoreDataBaseDAO.java
@Override
public boolean insertGlobalTransactionDO(GlobalTransactionDO globalTransactionDO) {
String sql = LogStoreSqlsFactory.getLogStoreSqls(dbType).getInsertGlobalTransactionSQL(globalTable);
Connection conn = null;
PreparedStatement ps = null;
try {
conn = logStoreDataSource.getConnection();
conn.setAutoCommit(true);
ps = conn.prepareStatement(sql);
ps.setString(1, globalTransactionDO.getXid());
ps.setLong(2, globalTransactionDO.getTransactionId());
ps.setInt(3, globalTransactionDO.getStatus());
ps.setString(4, globalTransactionDO.getApplicationId());
ps.setString(5, globalTransactionDO.getTransactionServiceGroup());
String transactionName = globalTransactionDO.getTransactionName();
transactionName = transactionName.length() > transactionNameColumnSize ? transactionName.substring(0,
transactionNameColumnSize) : transactionName;
ps.setString(6, transactionName);
ps.setInt(7, globalTransactionDO.getTimeout());
ps.setLong(8, globalTransactionDO.getBeginTime());
ps.setString(9, globalTransactionDO.getApplicationData());
return ps.executeUpdate() > 0;
} catch (SQLException e) {
throw new StoreException(e);
} finally {
IOUtil.close(ps, conn);
}
}
我们可以查看GlobalTransactionDO实体类的属性,和global_table 的字段进行比对,就能看出开启全局事务以后,把全局事务作为记录写入到了global_table表中了。
5. 图解调用流程Seata-源码分析图 | ProcessOn免费在线作图,在线流程图,在线思维导图 |
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)