RocketMQ(四)消息特殊处理

RocketMQ(四)消息特殊处理,第1张

1 消息顺序 1.1 背景

在订单流程场景中:创建订单、支付订单、订单完成这三个订单状态需要保证顺序执行,不能先支付订单,再创建订单。也不能订单完成,再去支付订单。

产生以上问题的原因大概率的是这两条消息不在同一个队列中,导致了并排消费,如下图,且后面的步骤先于前面。

利用rocketmq的顺序消息在rmq中,有一个顺序消息的概念,可以保证让发送的消息发送到一个消息队列中去(MessageQueue)

队列中有序,队列外无序

1.2 解决

把同一个订单的步骤放在同一个队列,这样就可以保证队列中有序,队列外无序

实现方法:

生产者:

for (OrderStep orderStep : orderStepList) {
   Message message = new Message("topic11", "tag11", orderStep.toString().getBytes(StandardCharsets.UTF_8));
   producer.send(message, new MessageQueueSelector() {
       @Override
       public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
           // 队列选择的方法 orderId对应一个确定队列
           int size = list.size();
           int i = orderStep.getOrderId() % size;  // 0 1 2 3
           MessageQueue messageQueue = list.get(i);
           return messageQueue;
       }
   }, null);
}

消费者

  • 是MessageListenerOrderly而不是MessageListenerConcurrently
// 消费者启动顺序监听,一个线程监听一个队列
consumer.registerMessageListener(new MessageListenerOrderly() {
    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
        for (MessageExt messageExt : list) {
            System.out.println(new String(messageExt.getBody()));
        }
        return ConsumeOrderlyStatus.SUCCESS;
    }
});

结果

2 消息事务 2.1 流程

首先两个基本概念,

  • Half Message(半消息):

暂时不能被 Consumer消费的消息。Producer已经把消息发送到 Broker端,但是此消息的状态被标记为不能投递,处于这种状态下的消息称为半消息。事实上,该状态下的消息会被放在一个叫做 RMQ_SYS_TRANS_HALF_TOPIC的主题下。

Producer端对它二次确认后,也就是 Commit之后,Consumer端才可以消费到;那么如果是Rollback,该消息则会被删除,永远不会被消费到。

  • 事务状态回查

可能因为网络原因、应用问题等,导致Producer端一直没有对这个半消息进行确认,那么这时候 Broker服务器会定时扫描这些半消息,主动找Producer端查询该消息的状态。

简而言之,RocketMQ事务消息的实现原理就是基于两阶段提交和事务状态回查,来决定消息最终是提交还是回滚的。

消息事务的流程图如下:

其中⑤⑥⑦是补偿消息

① producer给broker发送半消息
② broker会producer的半消息,响应OK
③ producer接收到响应消息后为确保消息不丢失,网本地数据库存一份。
④ producer给broker发送commit或rollback, 如果是commit,broker就会广播出去,如果是rollback, broker就会删除消息。
⑤ 如果broker迟迟没有等来④,就会回查事务。
⑥ producer检查事务
⑦基于检查结果做rollback或commit

2.1 事务消息发送示例

生产者Producer:

  • TransactionMQProducer 替换 DefaultMQProducer
  • 设置事务监听 setTransactionListener

下面是参考代码

public class Producer {
    public static void main(String[] args) throws MQClientException {
        TransactionMQProducer producer = new TransactionMQProducer("group1");
        producer.setNamesrvAddr("lcoalhost:9876");
        // 设置事务监听
        producer.setTransactionListener(new TransactionListener() {
            @Override
            public LocalTransactionState executeLocalTransaction(Message message, Object o) {
                // 正常情况下,把消息保存到数据库
                System.out.println("insert ...");
                // 提交事务 也可以 LocalTransactionState.ROLLBACK_MESSAGE
                return LocalTransactionState.COMMIT_MESSAGE;
            }

            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
                return null;
            }
        });
        try {
            producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
        String msg = "transaction hello world";
        Message message = new Message("topic13", "tag1", msg.getBytes(StandardCharsets.UTF_8));
        // 发送事务消息
        TransactionSendResult result = producer.sendMessageInTransaction(message, null);
        System.out.println(result);

        // 不要关闭生产者
    }
}

如果要回滚就要返回ROLLBACK_MESSAGE

return LocalTransactionState.ROLLBACK_MESSAGE;

消费者:

public class Consumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        consumer.setNamesrvAddr("lcoalhost:9876");
        consumer.subscribe("topic13", "tag1");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt messageExt : list) {
                    System.out.println(new String(messageExt.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}

2.2 事务补偿示例

生产者setTransactionListener方法进行如下修改:

  • executeLocalTransaction返回UNKNOW(broker一直收不到producer的确认状态)
  • checkLocalTransaction 处理事务补偿
 // 设置事务监听
producer.setTransactionListener(new TransactionListener() {
    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
        // 正常情况下,把消息保存到数据库
        System.out.println("insert ...");
        // 提交事务 也可以 LocalTransactionState.ROLLBACK_MESSAGE
        return LocalTransactionState.UNKNOW;
    }

    /**
     * 执行事务补偿,broker主动问producer发不发
     * @param messageExt
     * @return
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
        System.out.println("执行事务补偿");
        return LocalTransactionState.COMMIT_MESSAGE;
    }
});

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/730761.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-27
下一篇 2022-04-27

发表评论

登录后才能评论

评论列表(0条)

保存