RocketMQ 发送事务消息 事务消息的执行流程

RocketMQ 发送事务消息 事务消息的执行流程,第1张

RocketMQ 发送事务消息 事务消息的执行流程
  1. 本地方法发送消息至borker
  2. borker 执行TransactionListener的方法。执行本地事务executeLocalTransaction
  3. 如果返回unkow状态,borker会回调checkLocalTransaction检查事务的结果,返回提交还是回滚
  4. borker设置消息对外可见,等待消费或主动推送

事务消息共有三种状态,提交状态、回滚状态、中间状态:

  • TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。
  • TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。
  • TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态。

事务的流程如下图简述

 实例代码

    
    @Bean
    @Lazy
    public TransactionMQProducer defaultTransactionMQProducer() throws MQClientException {
        TransactionMQProducer producer = new TransactionMQProducer(TsitMQGroup.PROVIDER_INDUSTRY_GROUP);
        producer.setNamesrvAddr(ipcProps.getRocketMQServer());
        producer.start();
        log.info("init ipc defaultTransactionMQProducer");
        return producer;
    }

发送事务消息

        // 自定义线程池
        transactionMQProducer.setExecutorService(ThreadFactory.getInstance());
        transactionMQProducer.setTransactionListener(new TransactionListener() {

            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                List list = fp.stream().peek(it -> {
                    it.setId(idUtils.nextIdStr());
                    it.setOrganId(user.getOrganId());
                }).collect(Collectors.toList());
               boolean flag = cmFpMapper.saveBatch(list) != 0;
                
                // 需要borker回查就返回unkow
               return flag ? LocalTransactionState.COMMIT_MESSAGE :                         
                LocalTransactionState.ROLLBACK_MESSAGE;
            }

            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                // 此方法 borker并不会回查,因为在本地事务就返回了回滚还是提交
                // 当本地事务执行时返回unkow,borker回调用该该方法检查事务结果
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });

        transactionMQProducer.sendMessageInTransaction(
                new Message(TsitMQTopic.INDUSTRY_TOPIC, TsitMQTag.FP_CALLBACK, JSONUtil.toJsonStr(dto).getBytes(StandardCharsets.UTF_8)),
                null);

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5669963.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存