目录
学习之前先了解一下什么是rocketMq的事务消息?
为什么有ack机制了,还提出一个事物消息。rocketmq事物消息能给我带来什么?
Springboot 集成RocketMq发送事物消息
学习之前先了解一下什么是rocketMq的事务消息?
秉承懒人的性格。这里直接看别人写的博客吧! rocketmq事务消息
大致的流程总结
1) 发送方向 RocketMQ 发送“待确认” (Prepare) 消息。
2 ) RocketMQ 将收到的“待确认” (一般写入一个 HalfTopic 主题
时第一阶段消息发送完成。
发送方开始执行本地事件逻辑.
3) 发送方根据事件执行结果向 RocketMQ 发送二次确认(Commit 还是 Rollback) 消息 RocketMQ 收到 Commit 则将第一阶段消息标记为可投递(这些
消息才会进入生产时发送实际的主题 RealTopic), 订阅方将能够收到该消息; 收到 Rollback 状态则删除第一阶段的消息, 订阅方接收不到该消息。
4) 如果出现异常情况, 步骤 3 提交的二次确认最终未到达 RocketMQ,服务器在经过固定时间段后将对“待确认” 消息、 发起回查请求.
5) 发送方收到消息回查请求后(如果发送一阶段消息的 Producer 不能工作, 回查请求将被发送到和 Producer 在同一个 Group 里的其他 Producer ),
通过检查对应消息的本地事件执行结果返回 Commit Roolback 状态。
为什么有ack机制了,还提出一个事物消息。rocketmq事物消息能给我带来什么?rocketMq三种发送方式对比
三种发送方式,除了单向发送,无发送错误反馈。其余二中都能知道消息是否发送到服务端。
当然。前二种也有返回发送消息失败。实际消息已经发送到mq中的情况。这个时候需要消费者自己保证幂等性。这里不考虑。
那么都有消息确认机制了。为啥还有事物消息。什么场景能使用到事物消息?
思前想后,自己模拟了一个场景。业务大概是消费成功后推送消息扣减库存。如果简单使用消息确认机制。可能会有一下情况。
1.消费成功。发送队列成功 。没毛病,大概率这个情况
2.消费失败。直接回滚了也不会推送消息。
3.消费成功,发送队列返回失败。额。这怎么处理。
回滚消费业务?人家都付款成功了就因为发送队列失败。你把吃到肉吐出去。这个显然不可能。
不处理?不处理不扣库存,不发积分。你这么跟你领导回答看你们领导打不打你。
发送异常,插入发送失败表?这个好像可能。而且好像很多也是这么做的,这么做有问题吗?大概率没问题。不过也有可能insert失败表插入失败的情况。不过是小概率。
这里事物消息的场景就体现出来了。事物消息。也可以理解为可靠消息。或者一种解决方案。至少这里我是这么理解的
那么一言不合上代码吧!
Springboot 集成RocketMq发送事物消息Pom引入 这里有boot-starter不就直接使用原生的client了如果先要看看实现,直接搜下面的入口类
org.apache.rocketmq rocketmq-spring-boot-starter2.1.1
言归正传
模拟业务,订单交易成功,订单表order_info插入数据,order_slip 流水表插入记录。并且推送消息队列。
public interface IOrderInfoService extends IService{ void insertOrderInfo(OrderInfo orderInfo); }
public interface IOrderSlipService extends IService{ boolean isExistTx(String orderId); }
@Service public class OrderInfoServiceImpl extends ServiceImplimplements IOrderInfoService { @Resource private IOrderSlipService orderSlipService; @Override @Transactional(rollbackFor = Exception.class) public void insertOrderInfo(OrderInfo orderInfo) { baseMapper.insert(orderInfo); OrderSlip orderSlip = new OrderSlip(); orderSlip.setOrderId(orderInfo.getOrderId()); orderSlip.setStatus(1); orderSlipService.save(orderSlip); } }
@Service public class OrderSlipServiceImpl extends ServiceImplimplements IOrderSlipService { @Override public boolean isExistTx(String orderId) { QueryWrapper queryWrapper = new QueryWrapper<>(); queryWrapper.eq("order_id",orderId); queryWrapper.eq("status",1); OrderSlip orderSlip = baseMapper.selectOne(queryWrapper); return (orderSlip!=null); } }
rocketMqTransactionListener实现
@Component @RocketMQTransactionListener public class RocketMqTransactionListenerImpl implements RocketMQLocalTransactionListener{ @Resource private IOrderInfoService orderInfoService; @Resource private IOrderSlipService orderSlipService; @Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) { try{ // 执行业务逻辑 控制本地事务 String jsonString = new String((byte[]) message.getPayload()); // 比如 订单交易-插入流水 (同一事务) OrderInfo orderInfo = JSON.parseObject(jsonString, OrderInfo.class); orderInfoService.insertOrderInfo(orderInfo); return RocketMQLocalTransactionState.COMMIT; }catch (Exception e){ System.out.println(e); return RocketMQLocalTransactionState.ROLLBACK; } } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { RocketMQLocalTransactionState state; String jsonString = new String((byte[]) msg.getPayload()); OrderInfo orderInfo = JSON.parseObject(jsonString, OrderInfo.class); boolean existTx = orderSlipService.isExistTx(orderInfo.getOrderId()); if(existTx){ state = RocketMQLocalTransactionState.COMMIT; }else{ state = RocketMQLocalTransactionState.UNKNOWN; } return state; } }
Controller实现
@RestController @RequestMapping("/mq/product") public class RocketMqProductController { @Resource private RocketMQTemplate rocketMQTemplate; @RequestMapping("/test") public void myTestTran(){ OrderInfo orderInfo = new OrderInfo(); orderInfo.setGoodId(11); orderInfo.setNum(2); String orderId = UUID.randomUUID().toString().replaceAll("-",""); orderInfo.setOrderId(orderId); Messagemessage = MessageBuilder.withPayload(JSON.toJSonString(orderInfo)).build(); TransactionSendResult tx_product_msg = rocketMQTemplate.sendMessageInTransaction("tx_product_msg", message, null); System.out.println("发送成功"); } }
这里就是一个demo。以及大概的实现。当然生产中不可能这么简单需要具体按照业务去考虑。rocketMq只是提供了一个模板。
简单表述一下,就是如果业务成功之后。rocket commit失败。会有定时任务调用流水接口查看订单是否成功,当然现实中不可能只判断有没有这个记录这么简单。
这只是本人的观点。如果有错误请大家指出。再次感谢
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)