- 本地方法发送消息至borker
- borker 执行TransactionListener的方法。执行本地事务executeLocalTransaction
- 如果返回unkow状态,borker会回调checkLocalTransaction检查事务的结果,返回提交还是回滚
- borker设置消息对外可见,等待消费或主动推送
事务消息共有三种状态,提交状态、回滚状态、中间状态:
- TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。
- TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。
- TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态。
事务的流程如下图简述
实例代码
@Bean @Lazy public TransactionMQProducer defaultTransactionMQProducer() throws MQClientException { TransactionMQProducer producer = new TransactionMQProducer(TsitMQGroup.PROVIDER_INDUSTRY_GROUP); producer.setNamesrvAddr(ipcProps.getRocketMQServer()); producer.start(); log.info("init ipc defaultTransactionMQProducer"); return producer; }
发送事务消息
// 自定义线程池 transactionMQProducer.setExecutorService(ThreadFactory.getInstance()); transactionMQProducer.setTransactionListener(new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { Listlist = fp.stream().peek(it -> { it.setId(idUtils.nextIdStr()); it.setOrganId(user.getOrganId()); }).collect(Collectors.toList()); boolean flag = cmFpMapper.saveBatch(list) != 0; // 需要borker回查就返回unkow return flag ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE; } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { // 此方法 borker并不会回查,因为在本地事务就返回了回滚还是提交 // 当本地事务执行时返回unkow,borker回调用该该方法检查事务结果 return LocalTransactionState.COMMIT_MESSAGE; } }); transactionMQProducer.sendMessageInTransaction( new Message(TsitMQTopic.INDUSTRY_TOPIC, TsitMQTag.FP_CALLBACK, JSONUtil.toJsonStr(dto).getBytes(StandardCharsets.UTF_8)), null);
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)