在订单流程场景中:创建订单、支付订单、订单完成这三个订单状态需要保证顺序执行,不能先支付订单,再创建订单。也不能订单完成,再去支付订单。
产生以上问题的原因大概率的是这两条消息不在同一个队列中,导致了并排消费,如下图,且后面的步骤先于前面。
利用rocketmq的顺序消息在rmq中,有一个顺序消息的概念,可以保证让发送的消息发送到一个消息队列中去(MessageQueue)
队列中有序,队列外无序
把同一个订单的步骤放在同一个队列,这样就可以保证队列中有序,队列外无序
实现方法:
生产者:
for (OrderStep orderStep : orderStepList) {
Message message = new Message("topic11", "tag11", orderStep.toString().getBytes(StandardCharsets.UTF_8));
producer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
// 队列选择的方法 orderId对应一个确定队列
int size = list.size();
int i = orderStep.getOrderId() % size; // 0 1 2 3
MessageQueue messageQueue = list.get(i);
return messageQueue;
}
}, null);
}
消费者
- 是MessageListenerOrderly而不是MessageListenerConcurrently
// 消费者启动顺序监听,一个线程监听一个队列
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
for (MessageExt messageExt : list) {
System.out.println(new String(messageExt.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
结果
首先两个基本概念,
- Half Message(半消息):
暂时不能被 Consumer
消费的消息。Producer
已经把消息发送到 Broker
端,但是此消息的状态被标记为不能投递,处于这种状态下的消息称为半消息。事实上,该状态下的消息会被放在一个叫做 RMQ_SYS_TRANS_HALF_TOPIC
的主题下。
当 Producer
端对它二次确认后,也就是 Commit
之后,Consumer
端才可以消费到;那么如果是Rollback,该消息则会被删除,永远不会被消费到。
- 事务状态回查
可能因为网络原因、应用问题等,导致Producer端一直没有对这个半消息进行确认,那么这时候 Broker
服务器会定时扫描这些半消息,主动找Producer
端查询该消息的状态。
简而言之,RocketMQ
事务消息的实现原理就是基于两阶段提交和事务状态回查,来决定消息最终是提交还是回滚的。
消息事务的流程图如下:
其中⑤⑥⑦是补偿消息
① producer给broker发送半消息
② broker会producer的半消息,响应OK
③ producer接收到响应消息后为确保消息不丢失,网本地数据库存一份。
④ producer给broker发送commit或rollback, 如果是commit,broker就会广播出去,如果是rollback, broker就会删除消息。
⑤ 如果broker迟迟没有等来④,就会回查事务。
⑥ producer检查事务
⑦基于检查结果做rollback或commit
生产者Producer:
- TransactionMQProducer 替换 DefaultMQProducer
- 设置事务监听 setTransactionListener
下面是参考代码
public class Producer {
public static void main(String[] args) throws MQClientException {
TransactionMQProducer producer = new TransactionMQProducer("group1");
producer.setNamesrvAddr("lcoalhost:9876");
// 设置事务监听
producer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
// 正常情况下,把消息保存到数据库
System.out.println("insert ...");
// 提交事务 也可以 LocalTransactionState.ROLLBACK_MESSAGE
return LocalTransactionState.COMMIT_MESSAGE;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
return null;
}
});
try {
producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
String msg = "transaction hello world";
Message message = new Message("topic13", "tag1", msg.getBytes(StandardCharsets.UTF_8));
// 发送事务消息
TransactionSendResult result = producer.sendMessageInTransaction(message, null);
System.out.println(result);
// 不要关闭生产者
}
}
如果要回滚就要返回ROLLBACK_MESSAGE
return LocalTransactionState.ROLLBACK_MESSAGE;
消费者:
public class Consumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
consumer.setNamesrvAddr("lcoalhost:9876");
consumer.subscribe("topic13", "tag1");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt messageExt : list) {
System.out.println(new String(messageExt.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
2.2 事务补偿示例
生产者setTransactionListener
方法进行如下修改:
- executeLocalTransaction返回UNKNOW(broker一直收不到producer的确认状态)
- checkLocalTransaction 处理事务补偿
// 设置事务监听
producer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
// 正常情况下,把消息保存到数据库
System.out.println("insert ...");
// 提交事务 也可以 LocalTransactionState.ROLLBACK_MESSAGE
return LocalTransactionState.UNKNOW;
}
/**
* 执行事务补偿,broker主动问producer发不发
* @param messageExt
* @return
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
System.out.println("执行事务补偿");
return LocalTransactionState.COMMIT_MESSAGE;
}
});
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)