RocketMQ发送事务消息

RocketMQ发送事务消息,第1张

RocketMQ发送事务消息

 rocketmq发送事务消息的过程为

1.发送half消息

2.消息队列返回发送结果

3.执行本地事务

4.提交/回滚/未知

5.如果第四步为未知,则消息队列会反查本地事务,本地事务查询后再通知消息队列最终提交或回滚

生产者

public class Producer {

    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException {

        TransactionMQProducer producer = new TransactionMQProducer("group_09");
        producer.setNamesrvAddr("localhost:9876");

        producer.setTransactionListener(new TransactionListener() {
            @Override
            public LocalTransactionState executeLocalTransaction(Message message, Object o) {
                System.out.println("执行本地事务");
                return LocalTransactionState.UNKNOW;
            }

            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
                System.out.println("执行补偿事务");
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        });

        producer.start();

        Message msg =
                new Message("topic14", "tag1",
                        ("Hello RocketMQ transaction").getBytes(RemotingHelper.DEFAULT_CHARSET));
        SendResult sendResult = producer.sendMessageInTransaction(msg, null);
        System.out.println("发送成功" + sendResult);
    }
}

消费者

public class Consumer {

    public static void main(String[] args) throws MQClientException {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group_03");

        consumer.setNamesrvAddr("localhost:9876");

        consumer.subscribe("topic14", "*");

        consumer.setMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt messageExt : list) {
                    System.out.println(messageExt);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
    }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5634952.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存