最近回顾 Spring 事务相关的设计与实现,发现 Spring 事务设计的最初目的是为了统一 Java 中 JDBC、JTA 与 JPA 事务的使用方式,并且其实现参考了 JTA 规范。大多数人对 JDBC 都比较熟悉,而 JTA 和 JPA 由于使用较少,很多人对其比较陌生,尤其是 JTA。
网上 JTA 的相关文章,要么参照规范照本宣科的把原理简单介绍,要么就直接上 Spring Boot 整合 JTA 框架的代码,由于知识跨度比较大,增加了对 JTA 理解的难度。经过几天的不懈努力,查阅相关资料,我也对 JTA 有了一定的认识,这里将知识结构以循序渐进的方式进行介绍,也避免小伙伴们走弯路。
JTA 概述JTA 全称 Java Transaction API,是 X/OPEN CAE 规范中分布式事务 XA 规范在 Java 中的映射,是 Java 中使用事务的标准 API,同时支持单机事务与分布式事务。
作为 J2EE 平台规范的一部分,JTA 与 JDBC 类似,自身只提供了一组 Java 接口,需要由供应商来实现这些接口,与 JDBC 不同的是这些接口需要由不同的供应商来实现。
JTA 定义了分布式事务的 5 种角色,不同角色关注不同的内容,如下图所示。
1. 事务管理器
事务管理器 Transaction Manager 是分布式事务的核心,在 JTA 中使用 TransactionManager
接口表示,提供了事务 *** 作、资源管理、同步、事务传播等功能,等同于 Spring 中的 PlatformTransactionManager
。
- 事务管理:包括将事务对象存至线程上下文,开始事务,提交事务,回滚事务等常规 *** 作。
- 资源管理:将资源与事务对象建立关系,以便通过两阶段提交实现事务。
- 同步:同步原文为 Synchronization,其实是事务完成前后的一个回调接口。
- 事务传播:指在一个事务中开启子事务时,子事务的行为,与 Spring 事务传播行为类似。
2. 资源管理器
资源管理器 Resource Manager 提供了对资源访问的能力,典型的代表是关系型数据库、消息队列,在 JTA 中使用接口 XAResource
表示,通常通过一个资源适配器来实现,例如 JDBC 中的数据库驱动。
3. 通信资源管理器
通信资源管理器用于支持跨应用分布式事务的事务管理器之间的通信,JTA 规定它实现 JTS 规范定义的接口即可,通常用户不用关心。
4. 应用
使用分布式事务的应用,应用可以通过 JTA 规范中的 UserTransaction
接口来 *** 作事务。
5. 应用服务器
应用的运行环境,JTA 规定事务管理器应该由应用服务器来实现,如 jboss、weblogic、websphere,不过并非所有的应用服务器都实现了事务管理器,如 Tomcat。如果想在标准环境使用 JTA,可以使用支持 JTA 的第三发类库,如 Atomikos、Bitronix。
事务中有一个特性是原子性,它表示事务中的所有 *** 作要么全部完成,要么全部不完成。事务管理器通过两阶段提交实现这一特性。
两阶段提交将对事务的提交分成两部分,分别为准备阶段和提交阶段。
在准备阶段,事务管理器向资源管理器(如 JMS 消息队列或数据库)询问是否同意提交事务,如果资源管理器回复同意,则表示资源管理器可以将资源持久化。
在提交阶段,如果所有的资源管理器都回复同意,则事务管理器向所有的资源管理器发出提交请求,否则事务管理器向资源管理器发出回滚请求。
正常情况下的两阶段提交可以用如下的图来表示。
准备阶段失败导致事务回滚的两阶段提交过程可以用如下图来表示。
细心的小伙伴可能会想到一个问题,虽然准备阶段事务管理器与资源管理器正常交互,不过在提交阶段如果发生一些意外导致事务管理器与资源管理器之间的通信中断怎么办呢?这很可能导致分布式事务违反原子性。
- 如果资源管理器已经接收到事务管理器的提交请求,事务管理器恢复后要求资源管理器回滚将抛出
HeuristicCommitException
异常。 - 如果资源管理器决定回滚,事务管理器恢复后要求资源管理器提交,将抛出
HeuristicRollbackException
异常。 - 如果一部分资源管理器将资源提交,另一部分资源管理器将资源回滚,将导致
HeuristicMixedException
异常。
异常的场景可以用如下图来描述。
上面只是介绍了 JTA 相关的一些概念和基本原理,要想理解 JTA,还得看相关 API。
JTA API 分类JTA 的 API 大概可以分别为三部分,如下。
其中,应用使用的接口 UserTransaction
也同样需要事务管理器来实现。
XAResource
与 Xid
接口是 JDK 已提供的接口,位于包 javax.transaction.xa
包中。
其他的接口需要单独引入 jta 依赖,坐标如下。
<dependency>
<groupId>jakarta.transactiongroupId>
<artifactId>jakarta.transaction-apiartifactId>
<version>1.3.3version>
dependency>
其他接口位于包 javax.transaction
中。
除了 JTA 规范中的 API,特定于实现的资源管理器通常还会实现其他与 JTA 相关的接口。
1. JDBC
对于关系型数据库来说,还需要实现接口 javax.sql.XADataSource
与 javax.sql.XAConnection
。
XADataSource
接口定义如下。
public interface XADataSource extends CommonDataSource {
XAConnection getXAConnection() throws SQLException;
XAConnection getXAConnection(String user, String password) throws SQLException;
}
XAConnection
接口定义如下。
public interface XAConnection extends PooledConnection {
javax.transaction.xa.XAResource getXAResource() throws SQLException;
}
也就是说,通过 XADataSource
获取 XAConnection
,通过 XAConnection
获取 XAResource
,然后再进行资源的相关 *** 作。例如,MySQL 的 JDBC 驱动包就进行了相关实现。
另外一些第三方包也会进行相关实现,然后将实现作为代理,调用真正的实现,如 Druid。
2. JMS
对于 JMS 规范来说,消息队列作为资源管理器,除了要实现 XAResource
接口,还要实现 javax.jms.XASession
与 javax.jms.XAConnection
接口。这两个接口定义如下。
public interface XAConnection extends Connection {
XASession createXASession() throws JMSException;
Session createSession(boolean var1, int var2) throws JMSException;
}
public interface XASession extends Session {
Session getSession() throws JMSException;
XAResource getXAResource();
boolean getTransacted() throws JMSException;
void commit() throws JMSException;
void rollback() throws JMSException;
}
与 JDBC 类似,JMS 通过 XAConnection
获取 XASession
,通过 XASession
获取 XAResource
,然后再进行资源相关 *** 作。
JTA API 围绕事务的整个生命周期,一般来说,用户不用关心具体的 API,不过只有了解 JTA API 的设计才能理解其设计与实现。
TransactionManager事务管理器可以创建新的事务,并设置事务的相关属性,还允许应用获取创建后的事务,并且将事务与线程绑定。具体有以下方法。
- begin:创建新的事务,并且将事务与线程关联,如果不支持嵌套事务并且事务已存在将抛出 NotSupportedException 异常。
- commit:提交与线程关联的事务,并断开线程与事务的关联。如果没有事务与线程关联将抛出异常。实现将触发
Transaction.commit
、XAResource.prepare
、XAResource.commit
方法调用。 - rollback:回滚线程关联的事务,并断开线程与事务的关联。如果没有事务与线程关联也将抛出异常。实现将触发
Transaction.rollback
、XAResource.rollback
方法的调用。 - getTransaction:获取线程关联的事务 Transaction 对象,如果没有事务与当前线程关联则返回 null。
- setTansactionalTimeout:设置线程关联事务的超时秒数。
- getStatus:获取线程关联事务的状态。
- setRollbackOnly:设置线程关联事务的唯一结果是回滚。
- suspend:暂时挂起与线程关联的事务,将断开线程与事务的关联,方法返回的 Transaction 对象可作为
resume
方法参数恢复线程与事务的关系。实现将触发Transaction.delistResource
与XResource.end
方法的调用。 - resume:恢复指定事务与线程之间的关联。实现将触发
Transaction.enlistResource
、XAResource.start
方法的调用。
Transaction 事务接口用于对活动的事务进行 *** 作,这里活动的事务是指未提交的事务,对其他事务不可见。
- enlistResource:将资源添加到事务,以便执行两阶段提交,允许添加多个资源,资源的最终的 *** 作结果将与事务保持一致,即要么提交、要么回滚。实现将触发
XAResource.start
方开启事务分支。 - delistResource:解除资源与事务的关联。实现将触发
XAResource.end
方法调用结束事务分支。 - registerSynchronization:注册 Synchronization,接口方法将在事务完成前后被回调。
- commit:与
TransactionManager.commit
含义相同。 - getStatus:与
TransactionManager.getStatus
含义相同。 - rollback:与
TransactionManager.rollback
含义相同。 - setRollbackOnly:与
TransactionManager.setRollbackOnly
含义相同。
表示事务与资源的关联,一个事务可以关联多个资源。可以由资源管理器或事务管理器来实现这个接口。接口方法如下:
- getFormatId:获取全局事务格式 ID。
- getGlobalTransactionId:获取全局事务ID。
- getBranchQualifier:获取事务分支限定符。
XAResource 用来表示分布式事务角色中的资源管理器,资源适配器将实现 XAResource 接口以支持事务与资源的关联,允许不同的线程同时对 XAResource 进行 *** 作,但一个 XAResource 同时只能关联一个事务。事务提交时,触发资源管理器准备与提交。具体方法如下:
- start:开启事务分支,全局事务通过该方法与事务资源关联,由资源适配器获取资源(连接)时隐式触发。一但调用该方法资源将一直保持开启状态,直到释放(close)资源。
- end:结束事务分支,解除资源与事务的关联,调用该方法后可再调用 start 方法与其他事务建立关联。
- prepare:为 xid 指定的事务提交做准备。
- commit:提交 xid 指定的事务分支。
- rollback:回滚 xid 指定的事务分支。
- isSameRM:判断当前资源管理器是否与给定的资源管理器相同,用于
Transaction.enlistResource
添加资源时判断资源是否已添加。 - recover:用于意外导致事务管理器与资源管理器断开通信后,事务管理器恢复时查询准备好的事务。XAResource 在故障发生时未持久化,事务管理器需要有某种方法查找 XAResource,如通过 JNDI 查找机制,事务管理器可以忽略不属于它的事务。
- forget:用于忘记准备好的事务分支。
- getTransactionTimeout:获取事务超时秒数。
- setTransactionTimeout:设置事务超时秒数。
这个接口比较简单,用来表示事务完成前后的回调,由应用实现这个接口。这个接口未持久化,崩溃恢复事务后将丢失 Synchronization 实例。接口方法如下:
- beforeCompletion:事务提交前回调该方法。
- afterCompletion:事务提交或或回滚后调用该方法。
功能受限的事务接口,暴露给应用使用,由事务管理器实现。
方法包括 begin、commit、rollback、setRollbackOnly、getStatus、setTransactionTimeout,含义与 Transaction 中的方法相同,不再赘述。
JTA 活动事务交互前面已经介绍了事务提交、回滚、异常场景下各组件的交互,事务提交之前的活动事务也有自己的交互流程。根据前面 API 的介绍,可以大概总结流程如下。
需要注意的是只有 Connection 被 close 才会调用 Transaction.delistResource
释放资源,这意味着应该在 try{}finaly{}
中的 finally
块关闭连接。
了解 JTA API 之后我们可以通过实战的方式加深理解,由于目前 EJB 容器慢慢淡出了大家的视野,我们使用事务管理器的实现 Atomikos 加以演示。
Atomikos UserTransaction 实战使用 UserTransaction 需要了解 Atomikos 提供的两个类。
UserTransactionImp
:这个类实现是UserTransaction
的实现,内部封装了TransactionManager
。AtomikosDataSourceBean
:这个类是DataSource
的实现,内部封装了对XAResource
的相关 *** 作。
由于事务管理器和数据源都由 Atomikos 提供,因此其内部知道如何进行事务管理器、事务与资源之间的交互,例如可以将事务管理器设置为单例 bean,将事务/资源存到线程上下文。我们直接使用即可。
假定我们有一个 MySQL 数据库,数据库名为 test,表 user 数据结构如下。
create table user
(
id bigint unsigned auto_increment
primary key,
username varchar(20) null,
password varchar(20) null
)
我们可以测试使用 Atomikos 添加一条记录。首先引入依赖。
<dependency>
<groupId>jakarta.transactiongroupId>
<artifactId>jakarta.transaction-apiartifactId>
<version>1.3.3version>
dependency>
<dependency>
<groupId>com.atomikosgroupId>
<artifactId>transactions-jdbcartifactId>
<version>4.0.6version>
dependency>
<dependency>
<groupId>mysqlgroupId>
<artifactId>mysql-connector-javaartifactId>
<version>8.0.27version>
dependency>
先提供一个获取数据源的静态方法。
public class Application {
private static DataSource getDataSource() {
Properties properties = new Properties();
properties.put("url", "jdbc:mysql://127.0.0.1:3306/test");
properties.put("user", "root");
properties.put("password", "12345678");
AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
ds.setXaDataSourceClassName("com.mysql.cj.jdbc.MysqlXADataSource");
ds.setXaProperties(properties);
ds.setUniqueResourceName("resourceName");
ds.setPoolSize(10);
ds.setBorrowConnectionTimeout(60);
return ds;
}
}
其中代码中的 properties 用于配置 MysqlXADataSource 中连接数据库的属性值。
然后写一个测试方法。
public class Application {
public static void main(String[] args) throws Exception {
UserTransaction utx = new UserTransactionImp();
utx.setTransactionTimeout(60);
// 开启事务
utx.begin();
Connection conn = null;
PreparedStatement ps = null;
boolean error = false;
try {
conn = getDataSource().getConnection();
ps = conn.prepareStatement("insert into user(username,password) values('hkp','123')");
int count = ps.executeUpdate();
System.out.println("count" + count);
} catch (Exception e) {
error = true;
} finally {
// 先关闭 JDBC 中的 Statement 和 Connection
if (ps != null) {
ps.close();
}
if (conn != null) {
conn.close();
}
if (utx.getStatus() != Status.STATUS_NO_TRANSACTION) {
if (error) {
// 遇到异常回滚事务
utx.rollback();
} else {
// 正常提交事务
utx.commit();
}
}
}
}
}
可以看到,使用 Atomikos 提供的 UserTransaction
进行事务 *** 作方式与普通的 JDBC 基本一致,只是使用了 Atomikos 提供的数据源获取连接,然后在进行 JDBC *** 作前后添加了使用 UserTransaction
开启/结束事务的逻辑。
这里只是使用了一个数据源,也可以使用多个数据源开启分布式事务。
Atomikos TransactionManager 实战如果不想使用 AtomikosDataSourceBean
,也可以手动调用 JTA 的 API,标准环境使用 TransactionManager
的实现类 UserTransactionManager
即可,Web 环境也可以切换为 J2eeTransactionManager
。示例代码如下。
public class Application {
public static void main(String[] args) throws Exception {
TransactionManager tm = new UserTransactionManager();
tm.begin();
// 使用 MySQL XADataSource 的实现
MysqlXADataSource ds = new MysqlXADataSource();
ds.setURL("jdbc:mysql://127.0.0.1:3306/test");
ds.setUser("root");
ds.setPassword("12345678");
Connection conn = null;
PreparedStatement ps = null;
boolean error = false;
try {
// 获取 XAResource
XAConnection xaconn = ds.getXAConnection();
XAResource xares = xaconn.getXAResource();
// 从事务管理器中获取事务
Transaction tx = tm.getTransaction();
// 事务关联资源
tx.enlistResource(xares);
conn = xaconn.getConnection();
ps = conn.prepareStatement("insert into user(username,password) values('hkp','123')");
ps.executeUpdate();
// 事务与资源解除关联
tx.delistResource(xares, XAResource.TMSUCCESS);
} catch (Exception e) {
error = true;
} finally {
// 先使用事务管理器完成事务
if (tm.getStatus() != Status.STATUS_NO_TRANSACTION) {
if (error) {
// 遇到异常回滚事务
tm.rollback();
} else {
// 正常提交事务
tm.commit();
}
}
// 最后再关闭 JDBC 中的 Statement 和 Connection
if (ps != null) {
ps.close();
}
if (conn != null) {
conn.close();
}
}
}
}
代码完全遵循前面的示意图。
- 先创建
TransactionManager
。 - 然后调用
TransactionManager.begin
方法开启事务。 - 然后创建
XADataSource
并获取XAResource
。 - 然后获取
Transaction
并将其与XResource
关联。 - 然后就可以按照 JDBC 的正常流程执行 SQL 了。
- 执行 SQL 后先把断开事务与资源的关联关系。
- 最后提交事务后再 close JDBC 中的 Statement 和 Connection,避免先 close 导致事务管理器无法与数据库交互。
Spring 对 JDBC、JTA、JPA 的事务进行封装,提供了自己的事务管理器。
首先引入 Spring 相关依赖。
<dependency>
<groupId>org.springframeworkgroupId>
<artifactId>spring-txartifactId>
<version>5.2.6.RELEASEversion>
dependency>
<dependency>
<groupId>org.springframeworkgroupId>
<artifactId>spring-contextartifactId>
<version>5.2.6.RELEASEversion>
dependency>
对于 JTA 来说,配置如下。
@Configuration
@EnableTransactionManagement
public class JTAConfig {
@Bean(initMethod = "init", destroyMethod = "close")
public AtomikosDataSourceBean dataSource() {
Properties properties = new Properties();
properties.put("url", "jdbc:mysql://127.0.0.1:3306/test");
properties.put("user", "root");
properties.put("password", "12345678");
AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
ds.setXaDataSourceClassName("com.mysql.cj.jdbc.MysqlXADataSource");
ds.setXaProperties(properties);
ds.setUniqueResourceName("resourceName");
ds.setPoolSize(10);
ds.setBorrowConnectionTimeout(60);
return ds;
}
@Bean(initMethod = "init", destroyMethod = "close")
public UserTransactionManager userTransactionManager() throws SystemException {
UserTransactionManager userTransactionManager = new UserTransactionManager();
userTransactionManager.setTransactionTimeout(300);
userTransactionManager.setForceShutdown(true);
return userTransactionManager;
}
@Bean
public JtaTransactionManager jtaTransactionManager() throws SystemException {
JtaTransactionManager jtaTransactionManager = new JtaTransactionManager();
jtaTransactionManager.setTransactionManager(userTransactionManager());
jtaTransactionManager.setUserTransaction(userTransactionManager());
return jtaTransactionManager;
}
}
然后定义我们 *** 作数据库的 UserService 如下。
@Service
public class UserService {
@Autowired
private DataSource dataSource;
@Transactional(rollbackFor = Exception.class)
public void testInsert() {
try (Connection conn = dataSource.getConnection();
PreparedStatement ps = conn.prepareStatement("insert into user(username,password) values('kkk','789')")) {
int count = ps.executeUpdate();
System.out.println("count" + count);
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
}
最后运行测试类。
public class Application {
public static void main(String[] args) throws Exception {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
context.scan("com.zzuhkp.demo");
context.refresh();
UserService userService = context.getBean(UserService.class);
userService.testInsert();
context.close();
}
}
成功将数据插入数据库,如果事务方法抛出异常则不会提交事务到数据库。
Spring Boot Atomikos 实战Spring Boot 环境下的 Atomikos 使用较为简单,首先引入相关依赖。
<dependency>
<groupId>mysqlgroupId>
<artifactId>mysql-connector-javaartifactId>
<version>8.0.27version>
dependency>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-jdbcartifactId>
dependency>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-jta-atomikosartifactId>
dependency>
然后在 application.properties
进行数据源相关配置。
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/test
spring.datasource.username=root
spring.datasource.password=12345678
这样就完成了。
- 引入
spring-boot-starter-jta-atomikos
之后 Spring 会自动配置JtaTransactionManager
和AtomikosDataSourceBean
。 - 引入
spring-boot-starter-jdbc
则是为了引入事务相关依赖与功能特性。
仍然使用上述示例中的 UserService
,修改测试类如下。
@SpringBootApplication
public class Application implements CommandLineRunner {
public static void main(String[] args) throws Exception {
SpringApplication.run(Application.class, args);
}
@Autowired
private UserService userService;
@Override
public void run(String... args) throws Exception {
userService.testInsert();
}
}
可以正常提交事务,如果 UserService
抛出异常则回滚事务。多数据源的情况下手动配置多个 AtomikosDataSourceBean
作为 DataSource
bean 即可。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)