- SpringCloud-10: Seata处理分布式事务
- Seata
- Seata支持的分布式事务模式
- Seata安装
- example
- 创建数据库
- 新建Maven项目:seata-order-service
- 新建Maven项目:seata-storage-service
- 新建项目:seata-account-service
- 测试
官方文档
分布式事务:事务的参与者位于分布式系统的不同节点之上,一次大的 *** 作可能由许多个小的 *** 作组成,而每个小的 *** 作可能落到不同的节点上,分布式事务就需要保证这些小 *** 作要么全部成功,要么全部失败。
七种常见分布式事务详解(2PC、3PC、TCC、Saga、本地事务表、MQ事务消息、最大努力通知)
Seata Seata支持的分布式事务模式-
AT模式
2PC的变种,一阶段业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源。二阶段进行提交或者根据undo_log回滚。
-
TCC模式
TCC(Try Confirm Cancel)是应用层的两阶段提交。一阶段为Try,第二阶段为Confirm或者Cancel。每个业务逻辑都需要实现Try、Confirm和Cancel *** 作。代码侵入性强。
-
SAGA模式
SAGA事务的思想是将长事务拆分为多个本地短事务并依次正常提交,如果所有短事务均执行成功,那么分布式事务提交;如果出现某个参与者执行失败,则由Saga事务协调器进行回滚。
-
XA模式
XA 协议是由 X/Open 组织提出的分布式事务处理规范,主要定义了事务管理器 TM 和局部资源管理器 RM 之间的接口。目前主流的数据库,比如 oracle、DB2 都是支持 XA 协议的。mysql 从 5.0 版本开始,innoDB 存储引擎已经支持 XA 协议。
数据库XA语法:
XA {START|BEGIN} xid [JOIN|RESUME] XA END xid [SUSPEND [FOR MIGRATE]] XA PREPARE xid XA COMMIT xid [ONE PHASE] XA ROLLBACK xid XA RECOVER [FORMAT=['RAW'|'SQL']] xid: gtrid [, bqual [, formatID ]]
6 张图带你彻底搞懂分布式事务 XA 模式
AT模式、XA模式是业务无侵入的,而TCC模式和Saga是业务侵入的。
Seata分布式事务默认是AT模式。
术语:
-
Transaction ID XID
全局唯一的事务ID,根据XID区分不同的分布式事务
-
TC (Transaction Coordinator) - 事务协调者
维护全局和分支事务的状态,驱动全局事务提交或回滚。就是seata server。
- TM (Transaction Manager) - 事务管理器
定义全局事务的范围:开始全局事务、提交或回滚全局事务。@GlobalTransactional
注解的业务,也就是事务发起方。
- RM (Resource Manager) - 资源管理器
管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。事务的参与方,简单理解就是TM业务中,所有被调用的微服务,都作为RM角色。
处理过程:
TM
向TC
申请开启一个全局事务,全局事务创建成功返回一个全局唯一的XID;- XID在微服务调用链路的上下文中传播;
RM
向TC
注册分支事务,将其纳入XID对应全局事务的管辖;RM
向TC
报告分支事务状态;TC
调度XID下管辖的全部分支事务由RM
完成提交或回滚请求;TM
结束全局事务。
Github下载地址 v1.4.2
下载后解压,修改registry.conf
文件,配置注册中心。这里还是使用Nacos
。
v1.4.2只需要修改
registry.conf
文件
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "nacos"
nacos {
application = "seata-server"
serverAddr = "seckill.cc:1111"
group = "SEATA_GROUP"
namespace = "seata"
cluster = "default"
username = ""
password = ""
}
...
}
config {
# file、nacos 、apollo、zk、consul、etcd3
type = "nacos"
nacos {
serverAddr = "seckill.cc:1111"
namespace = "seata"
group = "SEATA_GROUP"
username = ""
password = ""
dataId = "seataServer.properties"
}
...
}
切换到Nacos,创建一个新的命名空间,记住命名空间ID。
添加配置文件:
内容如下:
参考这个文件写:https://github.com/seata/seata/blob/develop/script/config-center/config.txt
transport.type=TCP
transport.server=NIO
transport.heartbeat=true
transport.enableClientBatchSendRequest=true
transport.threadFactory.bossThreadPrefix=NettyBoss
transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker
transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler
transport.threadFactory.shareBossWorker=false
transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector
transport.threadFactory.clientSelectorThreadSize=1
transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread
transport.threadFactory.bossThreadSize=1
transport.threadFactory.workerThreadSize=default
transport.shutdown.wait=3
transport.serialization=seata
transport.compressor=none
# server
server.recovery.committingRetryPeriod=1000
server.recovery.asynCommittingRetryPeriod=1000
server.recovery.rollbackingRetryPeriod=1000
server.recovery.timeoutRetryPeriod=1000
server.undo.logSaveDays=7
server.undo.logDeletePeriod=86400000
server.maxCommitRetryTimeout=-1
server.maxRollbackRetryTimeout=-1
server.rollbackRetryTimeoutUnlockEnable=false
server.distributedLockExpireTime=10000
# store
#model改为db
store.mode=db
store.lock.mode=file
store.session.mode=file
# store.publicKey=""
store.file.dir=file_store/data
store.file.maxBranchSessionSize=16384
store.file.maxGlobalSessionSize=512
store.file.fileWriteBufferCacheSize=16384
store.file.flushDiskMode=async
store.file.sessionReloadReadSize=100
store.db.datasource=druid
store.db.dbType=mysql
store.db.driverClassName=com.mysql.jdbc.Driver
# 改为上面创建的seata服务数据库
store.db.url=jdbc:mysql://ip:3306/seata?useUnicode=true&rewriteBatchedStatements=true
# 改为自己的数据库用户名
store.db.user=root
# 改为自己的数据库密码
store.db.password=
store.db.minConn=5
store.db.maxConn=30
store.db.globalTable=global_table
store.db.branchTable=branch_table
store.db.distributedLockTable=distributed_lock
store.db.queryLimit=100
store.db.lockTable=lock_table
store.db.maxWait=5000
store.redis.mode=single
store.redis.single.host=127.0.0.1
store.redis.single.port=6379
# store.redis.sentinel.masterName=""
# store.redis.sentinel.sentinelHosts=""
store.redis.maxConn=10
store.redis.minConn=1
store.redis.maxTotal=100
store.redis.database=0
# store.redis.password=""
store.redis.queryLimit=100
# log
log.exceptionRate=100
# metrics
metrics.enabled=false
metrics.registryType=compact
metrics.exporterList=prometheus
metrics.exporterPrometheusPort=9898
# service
# 自己命名一个vgroupMapping
service.vgroupMapping.test-tx-group=default
service.default.grouplist=127.0.0.1:8091
service.enableDegrade=false
service.disableGlobalTransaction=false
# client
client.rm.asyncCommitBufferLimit=10000
client.rm.lock.retryInterval=10
client.rm.lock.retryTimes=30
client.rm.lock.retryPolicyBranchRollbackOnConflict=true
client.rm.reportRetryCount=5
client.rm.tableMetaCheckEnable=false
client.rm.tableMetaCheckerInterval=60000
client.rm.sqlParserType=druid
client.rm.reportSuccessEnable=false
client.rm.sagaBranchRegisterEnable=false
client.rm.tccActionInterceptorOrder=-2147482648
client.tm.commitRetryCount=5
client.tm.rollbackRetryCount=5
client.tm.defaultGlobalTransactionTimeout=60000
client.tm.degradeCheck=false
client.tm.degradeCheckAllowTimes=10
client.tm.degradeCheckPeriod=2000
client.tm.interceptorOrder=-2147482648
client.undo.dataValidation=true
client.undo.logSerialization=jackson
client.undo.onlyCareUpdateColumns=true
client.undo.logTable=undo_log
client.undo.compress.enable=true
client.undo.compress.type=zip
client.undo.compress.threshold=64k
创建数据库seata,并建表:
建表脚本:https://github.com/seata/seata/tree/develop/script/server/db
MySQL:
-- -------------------------------- The script used when storeMode is 'db' --------------------------------
-- the table to store GlobalSession data
CREATE TABLE IF NOT EXISTS `global_table`
(
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`status` TINYINT NOT NULL,
`application_id` VARCHAR(32),
`transaction_service_group` VARCHAR(32),
`transaction_name` VARCHAR(128),
`timeout` INT,
`begin_time` BIGINT,
`application_data` VARCHAR(2000),
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`xid`),
KEY `idx_status_gmt_modified` (`status` , `gmt_modified`),
KEY `idx_transaction_id` (`transaction_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;
-- the table to store BranchSession data
CREATE TABLE IF NOT EXISTS `branch_table`
(
`branch_id` BIGINT NOT NULL,
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`resource_group_id` VARCHAR(32),
`resource_id` VARCHAR(256),
`branch_type` VARCHAR(8),
`status` TINYINT,
`client_id` VARCHAR(64),
`application_data` VARCHAR(2000),
`gmt_create` DATETIME(6),
`gmt_modified` DATETIME(6),
PRIMARY KEY (`branch_id`),
KEY `idx_xid` (`xid`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;
-- the table to store lock data
CREATE TABLE IF NOT EXISTS `lock_table`
(
`row_key` VARCHAR(128) NOT NULL,
`xid` VARCHAR(128),
`transaction_id` BIGINT,
`branch_id` BIGINT NOT NULL,
`resource_id` VARCHAR(256),
`table_name` VARCHAR(32),
`pk` VARCHAR(36),
`status` TINYINT NOT NULL DEFAULT '0' COMMENT '0:locked ,1:rollbacking',
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`row_key`),
KEY `idx_status` (`status`),
KEY `idx_branch_id` (`branch_id`),
KEY `idx_xid_and_branch_id` (`xid` , `branch_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;
CREATE TABLE IF NOT EXISTS `distributed_lock`
(
`lock_key` CHAR(20) NOT NULL,
`lock_value` VARCHAR(20) NOT NULL,
`expire` BIGINT,
primary key (`lock_key`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;
INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('AsyncCommitting', ' ', 0);
INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('RetryCommitting', ' ', 0);
INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('RetryRollbacking', ' ', 0);
INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('TxTimeoutCheck', ' ', 0);
启动seata
nohup bash bin/seata-server.sh -p 8091 -h 可以ping通的IP &
Nacos中如果能看到对应的服务,说明安装启动成功。
example业务需求:下订单
-> 减库存
->扣余额
->改变订单状态
创建三个数据库:
CREATE DATABASE seata_order;
CREATE DATABASE seata_storage;
CREATE DATABASE seata_account;
在三个数据库下分别建表:
- seata_order
CREATE TABLE t_oder (
`id` BIGINT(11) NOT NULL AUTO_INCREMENT PRIMARY KEY,
`user_id` BIGINT(11) DEFAULT NULL COMMENT 'user id',
`product_id` BIGINT(11) DEFAULT NULL COMMENT '产品id',
`count` INT(11) DEFAULT NULL,
`money` DECIMAL(11, 0) DEFAULT NULL,
`status` INT(1) DEFAULT NULL COMMENT '订单状态, 0:创建中, 1:已完结'
) ENGINE=INNODB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8;
- seata_storage
CREATE TABLE t_storage (
`id` BIGINT(11) NOT NULL AUTO_INCREMENT PRIMARY KEY,
`product_id` BIGINT(11) DEFAULT NULL COMMENT '产品id',
`total` INT(11) DEFAULT NULL COMMENT '总库存',
`used` INT(11) DEFAULT NULL COMMENT '已用库存',
`residue` INT(11) DEFAULT NULL COMMENT '剩余库存'
) ENGINE=INNODB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;
INSERT INTO seata_storage.t_storage(`id`, `product_id`, `total`, `used`, `residue`) VALUES ('1', '1', '100', '0', '100');
- seata_account
CREATE TABLE t_account (
`id` BIGINT(11) NOT NULL AUTO_INCREMENT PRIMARY KEY COMMENT 'id',
`user_id` BIGINT(11) DEFAULT NULL COMMENT '用户id',
`total` DECIMAL(10,0) DEFAULT NULL COMMENT '总额度',
`used` DECIMAL(10,0) DEFAULT NULL COMMENT '已用余额',
`residue` DECIMAL(10,0) DEFAULT '0' COMMENT '剩余可用额度'
) ENGINE=INNODB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;
INSERT INTO seata_account.t_account(`id`, `user_id`, `total`, `used`, `residue`) VALUES ('1', '1', '1000', '0', '1000');
为每个数据库建立回滚表:
-- 注意此处0.7.0+ 增加字段 context
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8; COMMENT ='AT transaction mode undo table';
新建Maven项目:seata-order-service
-
pom.xml
:<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>seckillartifactId> <groupId>cc.seckillgroupId> <version>1.0-SNAPSHOTversion> parent> <modelVersion>4.0.0modelVersion> <artifactId>cloud-seata-order-service2001artifactId> <properties> <maven.compiler.source>8maven.compiler.source> <maven.compiler.target>8maven.compiler.target> properties> <dependencies> <dependency> <groupId>org.springframeworkgroupId> <artifactId>spring-context-supportartifactId> dependency> <dependency> <groupId>com.alibaba.cloudgroupId> <artifactId>spring-cloud-starter-alibaba-seataartifactId> <exclusions> <exclusion> <groupId>io.seatagroupId> <artifactId>seata-spring-boot-starterartifactId> exclusion> exclusions> dependency> <dependency> <groupId>io.seatagroupId> <artifactId>seata-spring-boot-starterartifactId> <version>1.4.2version> dependency> <dependency> <groupId>cc.seckillgroupId> <artifactId>cloud-api-commonsartifactId> <version>${project.version}version> dependency> <dependency> <groupId>com.alibaba.cspgroupId> <artifactId>sentinel-datasource-nacosartifactId> dependency> <dependency> <groupId>com.alibaba.cloudgroupId> <artifactId>spring-cloud-starter-alibaba-sentinelartifactId> dependency> <dependency> <groupId>org.springframework.cloudgroupId> <artifactId>spring-cloud-starter-openfeignartifactId> dependency> <dependency> <groupId>org.springframework.cloudgroupId> <artifactId>spring-cloud-loadbalancerartifactId> dependency> <dependency> <groupId>com.alibaba.cloudgroupId> <artifactId>spring-cloud-starter-alibaba-nacos-discoveryartifactId> dependency> <dependency> <groupId>org.springframework.bootgroupId> <artifactId>spring-boot-starter-webartifactId> dependency> <dependency> <groupId>org.springframework.bootgroupId> <artifactId>spring-boot-starter-actuatorartifactId> dependency> <dependency> <groupId>org.mybatis.spring.bootgroupId> <artifactId>mybatis-spring-boot-starterartifactId> dependency> <dependency> <groupId>com.alibabagroupId> <artifactId>druid-spring-boot-starterartifactId> dependency> <dependency> <groupId>mysqlgroupId> <artifactId>mysql-connector-javaartifactId> dependency> <dependency> <groupId>org.springframework.bootgroupId> <artifactId>spring-boot-starter-jdbcartifactId> dependency> <dependency> <groupId>com.baomidougroupId> <artifactId>mybatis-plus-boot-starterartifactId> dependency> <dependency> <groupId>org.springframework.bootgroupId> <artifactId>spring-boot-devtoolsartifactId> <scope>runtimescope> <optional>trueoptional> dependency> <dependency> <groupId>org.projectlombokgroupId> <artifactId>lombokartifactId> <optional>trueoptional> dependency> <dependency> <groupId>org.springframework.bootgroupId> <artifactId>spring-boot-starter-testartifactId> <scope>testscope> dependency> <dependency> <groupId>com.github.ulisesbocchiogroupId> <artifactId>jasypt-spring-boot-starterartifactId> dependency> dependencies> project>
-
yaml
:server: port: 2001 spring: application: name: seata-order-service cloud: nacos: discovery: server-addr: seckill.cc:1111 loadbalancer: cache: enabled: true caffeine: spec: initialCapacity=500,expireAfterWrite=5s datasource: type: com.alibaba.druid.pool.DruidDataSource # 数据源 *** 作类型 driver-class-name: org.gjt.mm.mysql.Driver # mysql 驱动 url: jdbc:mysql://mysql_server:3306/db_seckill?useUnicode=true&characterEncoding=utf-8&useSSL=false username: root password: ENC(JQaDTMkm+6SfkR02THGL4ir9FQ+CdlT+Q1c1i1beugd3VVULMR19YBfiksl7+xoP) druid: test-while-idle: false seata: enabled: true enable-auto-data-source-proxy: true #是否开启数据源自动代理,默认为true tx-service-group: test-tx-group #要与配置文件中的vgroupMapping一致 registry: #registry根据seata服务端的registry配置 type: nacos #默认为file nacos: application: seata-server #配置自己的seata服务 server-addr: seckill.cc:1111 #根据自己的seata服务配置 username: nacos password: ENC(JQaDTMkm+6SfkR02THGL4ir9FQ+CdlT+Q1c1i1beugd3VVULMR19YBfiksl7+xoP) namespace: 2fa7bcca-4687-4b7d-9434-1c8a0df249df # seata-server在nacos的命名空间ID cluster: default # 配置自己的seata服务cluster, 默认为 default group: SEATA_GROUP # seata-server在nacos的分组 config: type: nacos #默认file,如果使用file不配置下面的nacos,直接配置seata.service nacos: server-addr: seckill.cc:1111 #配置自己的nacos地址 group: SEATA_GROUP #配置自己的dev username: nacos password: ENC(JQaDTMkm+6SfkR02THGL4ir9FQ+CdlT+Q1c1i1beugd3VVULMR19YBfiksl7+xoP) namespace: 2fa7bcca-4687-4b7d-9434-1c8a0df249df #配置自己的dataId,由于搭建服务端时把客户端的配置也写在了seataServer.properties, # 所以这里用了和服务端一样的配置文件,实际客户端和服务端的配置文件分离出来更好 dataId: seataServer.properties mybatis: mapper-locations: classpath:mapper/*.xml type-aliases-package: cc.seckill.srpingcloud.entities mybatis-plus: configuration: log-impl: org.apache.ibatis.logging.stdout.StdOutImpl mapper-locations: classpath:mapper/*.xml jasypt: encryptor: # password: algorithm: PBEWITHHMACSHA512ANDAES_256 feign: client: config: default: connectTimeout: 1000 readTimeout: 1000 loggerLevel: basic
-
domain
:@Data @AllArgsConstructor @NoArgsConstructor @TableName(value = "t_order") public class Order { private Long id; private Long userId; private Long productId; private Integer count; private BigDecimal money; //订单状态 0 创建中, 1 已完成 private Integer status; }
-
DAO
:@Mapper public interface OrderMapper extends BaseMapper<Order> { }
-
service
:public interface OrderService { void create(Order order); } // **************************************************************; @FeignClient(value = "seata-account-service") @Component public interface AccountService { @PostMapping("/account/decrease") Result decrease(@RequestParam("userId") Long userId, @RequestParam("money") BigDecimal money); } // **************************************************************; @FeignClient(value = "seata-storage-service") @Component public interface StorageService { @PostMapping("/storage/decrease") Result decrease(@RequestParam("productId") Long productId, @RequestParam("count") Integer count); } // **************************************************************; @Service @Slf4j public class OrderServiceImpl implements OrderService { @Resource private AccountService accountService; @Resource private StorageService storageService; @Resource private OrderMapper orderMapper; @Override public void create(Order order) { log.info("新建订单: {}", order.getId()); orderMapper.insert(order); log.info("订单微服务开始调用库存服务, 开始扣减库存"); storageService.decrease(order.getProductId(), order.getCount()); log.info("订单微服务, 扣减库存完成"); log.info("订单微服务开始调用账号服务, 开始减余额"); accountService.decrease(order.getUserId(), order.getMoney()); log.info("订单微服务调用账号服务, 减余额完成"); // 修改订单状态 log.info("修改订单状态: {}", order.getId()); order.setStatus(1); orderMapper.updateById(order); log.info("修改订单状态完成 status=: {}", order.getStatus()); } }
-
controller
:@RestController public class OderController { @Resource private OrderService orderService; @GetMapping("/order/create") public Result create(Order order) { orderService.create(order); return new Result() .msg("订单创建成功") .code(200); } }
-
主启动类
@SpringBootApplication @EnableAutoDataSourceProxy @EnableDiscoveryClient @EnableFeignClients public class SeataOrderMain { public static void main(String[] args) { EnvironmentVariableInit.init(); SpringApplication.run(SeataOrderMain.class, args); } }
-
pom.xml
:<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>seckillartifactId> <groupId>cc.seckillgroupId> <version>1.0-SNAPSHOTversion> parent> <modelVersion>4.0.0modelVersion> <artifactId>cloud-seata-storage-service2002artifactId> <properties> <maven.compiler.source>8maven.compiler.source> <maven.compiler.target>8maven.compiler.target> properties> <dependencies> <dependency> <groupId>org.springframeworkgroupId> <artifactId>spring-context-supportartifactId> dependency> <dependency> <groupId>com.alibaba.cloudgroupId> <artifactId>spring-cloud-starter-alibaba-seataartifactId> <exclusions> <exclusion> <groupId>io.seatagroupId> <artifactId>seata-spring-boot-starterartifactId> exclusion> exclusions> dependency> <dependency> <groupId>io.seatagroupId> <artifactId>seata-spring-boot-starterartifactId> <version>1.4.2version> dependency> <dependency> <groupId>cc.seckillgroupId> <artifactId>cloud-api-commonsartifactId> <version>${project.version}version> dependency> <dependency> <groupId>com.alibaba.cspgroupId> <artifactId>sentinel-datasource-nacosartifactId> dependency> <dependency> <groupId>com.alibaba.cloudgroupId> <artifactId>spring-cloud-starter-alibaba-sentinelartifactId> dependency> <dependency> <groupId>org.springframework.cloudgroupId> <artifactId>spring-cloud-starter-openfeignartifactId> dependency> <dependency> <groupId>org.springframework.cloudgroupId> <artifactId>spring-cloud-loadbalancerartifactId> dependency> <dependency> <groupId>com.alibaba.cloudgroupId> <artifactId>spring-cloud-starter-alibaba-nacos-discoveryartifactId> dependency> <dependency> <groupId>org.springframework.bootgroupId> <artifactId>spring-boot-starter-webartifactId> dependency> <dependency> <groupId>org.springframework.bootgroupId> <artifactId>spring-boot-starter-actuatorartifactId> dependency> <dependency> <groupId>org.mybatis.spring.bootgroupId> <artifactId>mybatis-spring-boot-starterartifactId> dependency> <dependency> <groupId>com.alibabagroupId> <artifactId>druid-spring-boot-starterartifactId> dependency> <dependency> <groupId>mysqlgroupId> <artifactId>mysql-connector-javaartifactId> dependency> <dependency> <groupId>org.springframework.bootgroupId> <artifactId>spring-boot-starter-jdbcartifactId> dependency> <dependency> <groupId>com.baomidougroupId> <artifactId>mybatis-plus-boot-starterartifactId> dependency> <dependency> <groupId>org.springframework.bootgroupId> <artifactId>spring-boot-devtoolsartifactId> <scope>runtimescope> <optional>trueoptional> dependency> <dependency> <groupId>org.projectlombokgroupId> <artifactId>lombokartifactId> <optional>trueoptional> dependency> <dependency> <groupId>org.springframework.bootgroupId> <artifactId>spring-boot-starter-testartifactId> <scope>testscope> dependency> <dependency> <groupId>com.github.ulisesbocchiogroupId> <artifactId>jasypt-spring-boot-starterartifactId> dependency> dependencies> project>
-
yaml
:server: port: 2002 spring: application: name: seata-storage-service cloud: nacos: discovery: server-addr: seckill.cc:1111 datasource: type: com.alibaba.druid.pool.DruidDataSource # 数据源 *** 作类型 driver-class-name: org.gjt.mm.mysql.Driver # mysql 驱动 url: jdbc:mysql://tx_cloud:3306/seata_storage?useUnicode=true&characterEncoding=utf-8&useSSL=false username: root password: ENC(JQaDTMkm+6SfkR02THGL4ir9FQ+CdlT+Q1c1i1beugd3VVULMR19YBfiksl7+xoP) druid: test-while-idle: false mybatis: mapper-locations: classpath:mapper/*.xml type-aliases-package: cc.seckill.srpingcloud.entities mybatis-plus: configuration: log-impl: org.apache.ibatis.logging.stdout.StdOutImpl mapper-locations: classpath:mapper/*.xml jasypt: encryptor: # password: algorithm: PBEWITHHMACSHA512ANDAES_256 feign: client: config: default: connectTimeout: 1000 readTimeout: 1000 loggerLevel: basic seata: enabled: true enable-auto-data-source-proxy: true #是否开启数据源自动代理,默认为true tx-service-group: test-tx-group #要与配置文件中的vgroupMapping一致 registry: #registry根据seata服务端的registry配置 type: nacos #默认为file nacos: application: seata-server #配置自己的seata服务 server-addr: seckill.cc:1111 #根据自己的seata服务配置 username: nacos password: ENC(JQaDTMkm+6SfkR02THGL4ir9FQ+CdlT+Q1c1i1beugd3VVULMR19YBfiksl7+xoP) namespace: 2fa7bcca-4687-4b7d-9434-1c8a0df249df # seata-server在nacos的命名空间ID cluster: default # 配置自己的seata服务cluster, 默认为 default group: SEATA_GROUP # seata-server在nacos的分组 config: type: nacos #默认file,如果使用file不配置下面的nacos,直接配置seata.service nacos: server-addr: seckill.cc:1111 #配置自己的nacos地址 group: SEATA_GROUP #配置自己的dev username: nacos password: ENC(JQaDTMkm+6SfkR02THGL4ir9FQ+CdlT+Q1c1i1beugd3VVULMR19YBfiksl7+xoP) namespace: 2fa7bcca-4687-4b7d-9434-1c8a0df249df #配置自己的dataId,由于搭建服务端时把客户端的配置也写在了seataServer.properties, # 所以这里用了和服务端一样的配置文件,实际客户端和服务端的配置文件分离出来更好 dataId: seataServer.properties
-
domain
:@Data @TableName(value = "t_storage") public class Storage { private Long id; private Long productId; private Integer total; private Integer used; private Integer residue; }
-
dao
:@Mapper public interface StorageMapper extends BaseMapper<Storage> { }
-
service
:public interface StorageService { void decrease(Long productId, Integer count); } // ******************************************************************; @Service @Slf4j public class StorageServiceImpl implements StorageService { @Resource private StorageMapper storageMapper; @Override public void decrease(Long productId, Integer count) { log.info("------->storage-service 中扣减库存开始 "); Storage storage = storageMapper.selectById(productId); storage.setUsed(storage.getUsed() + count); storage.setResidue(storage.getResidue() - count); storageMapper.updateById(storage); log.info("------->storage-service 中扣减库存结束 "); } }
-
controller
:@RestController public class StorageController { @Resource private StorageService storageService; @RequestMapping("/storage/decrease") public Result decrease(Long productId, Integer count) { storageService.decrease(productId, count); return new Result() .code(200) .msg("扣减库存成功"); } }
-
主启动类:
@SpringBootApplication @EnableFeignClients @EnableDiscoveryClient @EnableAutoDataSourceProxy public class StorageMain { public static void main(String[] args) { EnvironmentVariableInit.init(); SpringApplication.run(StorageMain.class, args); } }
-
pom.xml
:<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>seckillartifactId> <groupId>cc.seckillgroupId> <version>1.0-SNAPSHOTversion> parent> <modelVersion>4.0.0modelVersion> <artifactId>cloud-seata-account-service2003artifactId> <properties> <maven.compiler.source>8maven.compiler.source> <maven.compiler.target>8maven.compiler.target> properties> <dependencies> <dependency> <groupId>org.springframeworkgroupId> <artifactId>spring-context-supportartifactId> dependency> <dependency> <groupId>com.alibaba.cloudgroupId> <artifactId>spring-cloud-starter-alibaba-seataartifactId> <exclusions> <exclusion> <groupId>io.seatagroupId> <artifactId>seata-spring-boot-starterartifactId> exclusion> exclusions> dependency> <dependency> <groupId>io.seatagroupId> <artifactId>seata-spring-boot-starterartifactId> <version>1.4.2version> dependency> <dependency> <groupId>cc.seckillgroupId> <artifactId>cloud-api-commonsartifactId> <version>${project.version}version> dependency> <dependency> <groupId>com.alibaba.cspgroupId> <artifactId>sentinel-datasource-nacosartifactId> dependency> <dependency> <groupId>com.alibaba.cloudgroupId> <artifactId>spring-cloud-starter-alibaba-sentinelartifactId> dependency> <dependency> <groupId>org.springframework.cloudgroupId> <artifactId>spring-cloud-starter-openfeignartifactId> dependency> <dependency> <groupId>org.springframework.cloudgroupId> <artifactId>spring-cloud-loadbalancerartifactId> dependency> <dependency> <groupId>com.alibaba.cloudgroupId> <artifactId>spring-cloud-starter-alibaba-nacos-discoveryartifactId> dependency> <dependency> <groupId>org.springframework.bootgroupId> <artifactId>spring-boot-starter-webartifactId> dependency> <dependency> <groupId>org.springframework.bootgroupId> <artifactId>spring-boot-starter-actuatorartifactId> dependency> <dependency> <groupId>org.mybatis.spring.bootgroupId> <artifactId>mybatis-spring-boot-starterartifactId> dependency> <dependency> <groupId>com.alibabagroupId> <artifactId>druid-spring-boot-starterartifactId> dependency> <dependency> <groupId>mysqlgroupId> <artifactId>mysql-connector-javaartifactId> dependency> <dependency> <groupId>org.springframework.bootgroupId> <artifactId>spring-boot-starter-jdbcartifactId> dependency> <dependency> <groupId>com.baomidougroupId> <artifactId>mybatis-plus-boot-starterartifactId> dependency> <dependency> <groupId>org.springframework.bootgroupId> <artifactId>spring-boot-devtoolsartifactId> <scope>runtimescope> <optional>trueoptional> dependency> <dependency> <groupId>org.projectlombokgroupId> <artifactId>lombokartifactId> <optional>trueoptional> dependency> <dependency> <groupId>org.springframework.bootgroupId> <artifactId>spring-boot-starter-testartifactId> <scope>testscope> dependency> <dependency> <groupId>com.github.ulisesbocchiogroupId> <artifactId>jasypt-spring-boot-starterartifactId> dependency> dependencies> project>
-
yaml
:server: port: 2003 spring: application: name: seata-account-service cloud: nacos: discovery: server-addr: seckill.cc:1111 datasource: type: com.alibaba.druid.pool.DruidDataSource # 数据源 *** 作类型 driver-class-name: org.gjt.mm.mysql.Driver # mysql 驱动 url: jdbc:mysql://tx_cloud:3306/seata_account?useUnicode=true&characterEncoding=utf-8&useSSL=false username: root password: ENC(JQaDTMkm+6SfkR02THGL4ir9FQ+CdlT+Q1c1i1beugd3VVULMR19YBfiksl7+xoP) druid: test-while-idle: false mybatis: mapper-locations: classpath:mapper/*.xml type-aliases-package: cc.seckill.srpingcloud.entities mybatis-plus: configuration: log-impl: org.apache.ibatis.logging.stdout.StdOutImpl mapper-locations: classpath:mapper/*.xml feign: client: config: default: connectTimeout: 1000 readTimeout: 1000 loggerLevel: basic seata: enabled: true enable-auto-data-source-proxy: true #是否开启数据源自动代理,默认为true tx-service-group: test-tx-group #要与配置文件中的vgroupMapping一致 registry: #registry根据seata服务端的registry配置 type: nacos #默认为file nacos: application: seata-server #配置自己的seata服务 server-addr: seckill.cc:1111 #根据自己的seata服务配置 username: nacos password: ENC(JQaDTMkm+6SfkR02THGL4ir9FQ+CdlT+Q1c1i1beugd3VVULMR19YBfiksl7+xoP) namespace: 2fa7bcca-4687-4b7d-9434-1c8a0df249df # seata-server在nacos的命名空间ID cluster: default # 配置自己的seata服务cluster, 默认为 default group: SEATA_GROUP # seata-server在nacos的分组 config: type: nacos #默认file,如果使用file不配置下面的nacos,直接配置seata.service nacos: server-addr: seckill.cc:1111 #配置自己的nacos地址 group: SEATA_GROUP #配置自己的dev username: nacos password: ENC(JQaDTMkm+6SfkR02THGL4ir9FQ+CdlT+Q1c1i1beugd3VVULMR19YBfiksl7+xoP) namespace: 2fa7bcca-4687-4b7d-9434-1c8a0df249df #配置自己的dataId,由于搭建服务端时把客户端的配置也写在了seataServer.properties, # 所以这里用了和服务端一样的配置文件,实际客户端和服务端的配置文件分离出来更好 dataId: seataServer.properties
-
domain
:@Data @AllArgsConstructor @NoArgsConstructor @TableName(value = "t_account") public class Account { private Long id; private Long userId; private BigDecimal total; private BigDecimal used; private BigDecimal residue; }
-
dao
:@Mapper public interface AccountMapper extends BaseMapper<Account> { }
-
service
:public interface AccountService { void decrease(@RequestParam("userId") Long userId, @RequestParam("money") BigDecimal money); } // ******************************************************************; @Service @Slf4j public class AccountServiceImpl implements AccountService { @Resource private AccountMapper accountMapper; @Override public void decrease(Long userId, BigDecimal money) { log.info("------->account-service 中扣减账户余额开始 "); QueryWrapper<Account> queryWrapper = new QueryWrapper<>(); queryWrapper.lambda() .eq(Account::getUserId, userId); Account account = accountMapper.selectOne(queryWrapper); account.setUsed(account.getUsed().add(money)); account.setResidue(account.getResidue().subtract(money)); accountMapper.updateById(account); log.info("------->account-service 中扣减账户余额结束 "); } }
-
controller
:@RestController public class AccountController { @Resource private AccountService accountService; @RequestMapping(value = "/account/decrease") public Result decrease(@RequestParam("userId") Long userId, @RequestParam("money") BigDecimal money) { accountService.decrease(userId, money); return new Result() .code(200) .msg("扣减余额成功"); } }
-
主启动类
@SpringBootApplication @EnableAutoDataSourceProxy @EnableFeignClients @EnableDiscoveryClient public class AccountMain { public static void main(String[] args) { EnvironmentVariableInit.init(); SpringApplication.run(AccountMain.class, args); } }
现在还没有开启分布式事务,会出问题。
浏览器输入:http://localhost:2001/order/create?userId=1&productId=1&count=10&money=100
查看对应表变化:
- order表多了一条记录:
-
storage表记录变化正常
-
account表记录变化正常
因为我们这个调用是单线程的,所以在每个微服务都不出异常的情况下,是没问题的。
现在模拟某个微服务出错。
修改 AccountServiceImpl
,添加一个超时的方法。因为我们在调用方2001的OpenFeign中设置的默认超时时间是1s,所以对这里的调用会报超时异常。
@Service
@Slf4j
public class AccountServiceImpl implements AccountService {
@Resource
private AccountMapper accountMapper;
@Override
public void decrease(Long userId, BigDecimal money) {
log.info("------->account-service 中扣减账户余额开始 ");
QueryWrapper<Account> queryWrapper = new QueryWrapper<>();
queryWrapper.lambda()
.eq(Account::getUserId, userId);
try {
// 模拟超时异常 调用方Feign超时时间设置的是1s所以一定会报错
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Account account = accountMapper.selectOne(queryWrapper);
account.setUsed(account.getUsed().add(money));
account.setResidue(account.getResidue().subtract(money));
accountMapper.updateById(account);
log.info("------->account-service 中扣减账户余额结束 ");
}
}
再次调用发现报错:
查看订单:
可以看到新插入的订单状态为NULL
,说明accountService.decrease(order.getUserId(), order.getMoney());
这句之后的语句没有执行(因为这句报错了,超时)
@Override
public void create(Order order) {
log.info("订单信息: {}", order);
orderMapper.insert(order);
log.info("订单微服务开始调用库存服务, 开始扣减库存");
storageService.decrease(order.getProductId(), order.getCount());
log.info("订单微服务, 扣减库存完成");
log.info("订单微服务开始调用账号服务, 开始减余额");
accountService.decrease(order.getUserId(), order.getMoney());
log.info("订单微服务调用账号服务, 减余额完成");
// 修改订单状态
log.info("修改订单状态: {}", order.getId());
order.setStatus(1);
orderMapper.updateById(order);
log.info("修改订单状态完成 status=: {}", order.getStatus());
}
从控制栏也能看出确实报错了。
但是storage表和account表是正常的(account表也有可能不正常,如果Feign配置了超时重试的话)
在业务方法上添加@GlobalTransactional
注解,即可开启全局事务。
@Override
@GlobalTransactional(name = "test_global_xid", rollbackFor = Exception.class)
public void create(Order order) {
log.info("订单信息: {}", order);
orderMapper.insert(order);
log.info("订单微服务开始调用库存服务, 开始扣减库存");
storageService.decrease(order.getProductId(), order.getCount());
log.info("订单微服务, 扣减库存完成");
log.info("订单微服务开始调用账号服务, 开始减余额");
accountService.decrease(order.getUserId(), order.getMoney());
log.info("订单微服务调用账号服务, 减余额完成");
// 修改订单状态
log.info("修改订单状态: {}", order.getId());
order.setStatus(1);
orderMapper.updateById(order);
log.info("修改订单状态完成 status=: {}", order.getStatus());
}
重启后,再次执行
可以看到仍然是报错了,但是这个时候再查看数据库
三个表都是正常的,没有再发生不一致现象。
查看控制台,可以看到事务回滚了。
Closing non transactional SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@43d2c217]
2022-05-07 19:01:17.424 INFO 26399 --- [nio-2002-exec-7] c.s.s.service.impl.StorageServiceImpl : ------->storage-service 中扣减库存结束
2022-05-07 19:01:17.800 INFO 26399 --- [h_RMROLE_1_1_16] i.s.r.d.undo.AbstractUndoLogManager : xid 202.199.6.118:8091:6152177009216172033 branch 6152177009216172039, undo_log deleted with GlobalFinished
2022-05-07 19:01:17.844 INFO 26399 --- [h_RMROLE_1_1_16] io.seata.rm.AbstractRMHandler : Branch Rollbacked result: PhaseTwo_Rollbacked
Closing non transactional SqlSession [org.apache.ibatis.session.defaults.DefaultSqlSession@6701e822]
2022-05-07 19:01:16.224 INFO 45763 --- [nio-2001-exec-1] c.s.s.service.impl.OrderServiceImpl : 订单微服务开始调用库存服务, 开始扣减库存
2022-05-07 19:01:17.875 INFO 45763 --- [h_RMROLE_1_1_16] i.s.c.r.p.c.RmBranchRollbackProcessor : rm handle branch rollback process:xid=202.199.6.118:8091:6152177009216172033,branchId=6152177009216172035,branchType=AT,resourceId=jdbc:mysql://tx_cloud:3306/seata_order,applicationData=null
2022-05-07 19:01:17.877 INFO 45763 --- [h_RMROLE_1_1_16] io.seata.rm.AbstractRMHandler : Branch Rollbacking: 202.199.6.118:8091:6152177009216172033 6152177009216172035 jdbc:mysql://tx_cloud:3306/seata_order
2022-05-07 19:01:18.365 INFO 45763 --- [h_RMROLE_1_1_16] i.s.r.d.undo.AbstractUndoLogManager : xid 202.199.6.118:8091:6152177009216172033 branch 6152177009216172035, undo_log deleted with GlobalFinished
2022-05-07 19:01:18.410 INFO 45763 --- [h_RMROLE_1_1_16] io.seata.rm.AbstractRMHandler : Branch Rollbacked result: PhaseTwo_Rollbacked
2022-05-07 19:01:18.471 INFO 45763 --- [nio-2001-exec-1] i.seata.tm.api.DefaultGlobalTransaction : Suspending current transaction, xid = 202.199.6.118:8091:6152177009216172033
2022-05-07 19:01:18.472 INFO 45763 --- [nio-2001-exec-1] i.seata.tm.api.DefaultGlobalTransaction : [202.199.6.118:8091:6152177009216172033] rollback status: Rollbacked
2022-05-07 19:01:18.498 ERROR 45763 --- [nio-2001-exec-1] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is feign.RetryableException: Read timed out executing POST http://seata-storage-service/storage/decrease?productId=1&count=10] with root cause
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)