背景做过电商系统的人都会遇到一个场景,就是下了订单之后,订单支付会有一个有效期,超时订单自动关闭。实现的技术有很多,再次讨论基于RabbitMQ进行实现
思路这个是基于RabbitMQ的延迟队列实现的,那需要讨论下什么是延迟队列
延迟队列延迟队列存储的对象是对应的延迟消息,所谓“延迟消息”是指当消息被发送以后,并不
想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费 。PS: 在 AMQP 协议中,或者 RabbitMQ 本身没有直接支持延迟队列的功能,但是可以通过 DLX(死信队列) 和 TTL 模拟出延迟队列的功能,所以需要讨论下什么是死信队列
死信队列DLX,全称为 Dead-Letter-Exchange,可以称之为死信交换器,也有人称之为死信邮箱。当
消息在一个队列中变成死信( dead message )之后,它能被重新被发送到另一个交换器中,这个交换器就是 DLX,绑定 DLX 的队列就称之为死信队列。
消息一般在一下情况下会转换成死信队列:
- 消息被拒绝( Basic.Reject/Basic.Nack ),井且设置 requeue 参数为 false;
- 消息过期(TTL);
- 队列达到最大长度。
我们这里采用消息过期的方式进行分析实现
PS: 建立两个队列(可以是不同交换器),将队列A中信息设置成一个过期时间,当消息过期之后,会自动投递到队列B,那么监听队列B的消费根据再进行接下来的业务处理
实现
maven配置
org.springframework.boot spring-boot-starter-actuatororg.springframework.boot spring-boot-starter-webcom.baomidou mybatis-plus-boot-starter3.3.1.tmp com.github.xiaoymin knife4j-spring-boot-starter2.0.1 guava com.google.guava com.google.guava guava20.0 org.springframework.boot spring-boot-starter-amqporg.springframework.amqp spring-rabbit-testtest mysql mysql-connector-java5.1.29 org.projectlombok lomboktrue
基于Springboot的RabbitMQ配置
spring: datasource: driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://192.168.116.128:3306/store?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai username: root password: root hikari: pool-name: coreHikariPool maximum-pool-size: 12 connection-timeout: 30000 minimum-idle: 10 idle-timeout: 500000 max-lifetime: 540000 connection-test-query: SELECT 1 auto-commit: true rabbitmq: addresses: 192.168.116.143:5672,192.168.116.144:5672,192.168.116.145:5672 username: admin password: admin$
配置启动时候创建的队列
package com.example.order.core.config; public enum QueueEnum { QUEUE_AUTO_CANCEL("vv_x", "zz_test", "key"), QUEUE_DELAY_ORDER("vv_x_ttl", "zz_test_ttl", "key_ttl"); QueueEnum(String exchange, String queue, String routeKey) { this.exchange = exchange; this.queue = queue; this.routeKey = routeKey; } public String getExchange() { return exchange; } public String getQueue() { return queue; } public String getRouteKey() { return routeKey; } private String exchange; public String queue; private String routeKey; }
package com.example.order.core.config; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitMqConfig { @Bean DirectExchange orderDirect() { return (DirectExchange) ExchangeBuilder .directExchange(QueueEnum.QUEUE_AUTO_CANCEL.getExchange()) .durable(true) .build(); } @Bean DirectExchange orderTtlDirect() { return (DirectExchange) ExchangeBuilder .directExchange(QueueEnum.QUEUE_DELAY_ORDER.getExchange()) .durable(true) .build(); } @Bean public Queue orderQueue() { return new Queue(QueueEnum.QUEUE_AUTO_CANCEL.getQueue()); } @Bean public Queue orderTtlQueue() { return QueueBuilder .durable(QueueEnum.QUEUE_DELAY_ORDER.getQueue()) //到期后转发的交换机 .withArgument("x-dead-letter-exchange", QueueEnum.QUEUE_AUTO_CANCEL.getExchange()) //到期后转发的路由键 .withArgument("x-dead-letter-routing-key", QueueEnum.QUEUE_AUTO_CANCEL.getRouteKey()) .build(); } @Bean Binding orderBinding(DirectExchange orderDirect, Queue orderQueue) { return BindingBuilder .bind(orderQueue) .to(orderDirect) .with(QueueEnum.QUEUE_AUTO_CANCEL.getRouteKey()); } @Bean Binding orderTtlBinding(DirectExchange orderTtlDirect, Queue orderTtlQueue) { return BindingBuilder .bind(orderTtlQueue) .to(orderTtlDirect) .with(QueueEnum.QUEUE_DELAY_ORDER.getRouteKey()); } }
消息生产者
package com.example.order.core.listener; import com.example.order.core.config.QueueEnum; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Slf4j @Component public class OrderMessageSender { @Autowired private RabbitTemplate rabbitTemplate; public void sendMessage(Long orderId, final long delayTimes) { //给延迟队列发送消息 rabbitTemplate.convertAndSend(QueueEnum.QUEUE_DELAY_ORDER.getExchange(), QueueEnum.QUEUE_DELAY_ORDER.getRouteKey() , orderId, message -> { //给消息设置延迟毫秒值 message.getMessageProperties().setExpiration(String.valueOf(delayTimes)); return message; }); log.info("send delay message orderId:{}", orderId); } }
消息消费者
package com.example.order.core.listener; import com.example.order.core.service.ShopOrderOperateService; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Slf4j @Component public class AutoCancelListener { @Autowired private ShopOrderOperateService operateService; @RabbitListener(queues = "zz_test") @RabbitHandler public void handle(Long orderId) { boolean b = operateService.cancelOrder(orderId); log.info("receive delay message orderId:{},and auto cancel flag:{}", orderId, b); } }
真正实现自动取消订单的实现类及所有依赖
package com.example.order.core.service; import com.example.order.core.entity.ShopOrderDO; public interface ShopOrderOperateService { boolean save(ShopOrderDO dto); boolean cancelOrder(Long orderId); }
package com.example.order.core.entity; import com.baomidou.mybatisplus.annotation.TableName; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; import lombok.Data; @Data @TableName("shop_order") @ApiModel(value = "实体") public class ShopOrderDO extends baseDO { private static final long serialVersionUID = -8985653170140721455L; @ApiModelProperty(value = "订单状态", notes = "1:待支付,2:支付成功,3:订单超时 4:订单支付失败") private Integer status; @ApiModelProperty(value = "重要成都", notes = "") private Integer importance; @ApiModelProperty(value = "备注", notes = "") private String remark; @Override public String toString() { return "ShopOrderDO{" + " id=" + id + ", status=" + status + '}' + "过了:" + (System.currentTimeMillis() - this.getGmtCreate().getTime()) / 1000 + "s"; } }
//ShopOrderMapper.xml
id, gmt_create, creator, gmt_modified, modifier, status, is_deleted, importance,remark
package com.example.order.core.mapper; import com.baomidou.mybatisplus.core.mapper.baseMapper; import com.example.order.core.entity.ShopOrderDO; public interface ShopOrderMapper extends baseMapper{ }
package com.example.order.core.service; import com.baomidou.mybatisplus.extension.service.IService; import com.example.order.core.entity.ShopOrderDO; public interface ShopOrderService extends IService{ }
package com.example.order.core.service.impl; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.example.order.core.entity.ShopOrderDO; import com.example.order.core.mapper.ShopOrderMapper; import com.example.order.core.service.ShopOrderService; import org.springframework.stereotype.Service; @Service public class ShopOrderServiceImpl extends ServiceImplimplements ShopOrderService { }
package com.example.order.core.service; import com.example.order.core.entity.ShopOrderDO; public interface ShopOrderOperateService { boolean save(ShopOrderDO dto); boolean cancelOrder(Long orderId); }
package com.example.order.core.service.impl; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.example.order.core.entity.ShopOrderDO; import com.example.order.core.listener.OrderMessageSender; import com.example.order.core.service.ShopOrderOperateService; import com.example.order.core.service.ShopOrderService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.Date; @Slf4j @Service public class ShopOrderOperateServiceImpl implements ShopOrderOperateService { @Resource OrderMessageSender sender; @Autowired private ShopOrderService ext; @Override public boolean save(ShopOrderDO dto) { try { dto.setStatus(1); boolean save = ext.save(dto); if (save) { sender.sendMessage(dto.getId(), 1 * 60 * 1000); } return true; } catch (Exception e) { log.error(".....{}", e); } return false; } @Override public boolean cancelOrder(Long orderId) { //把待支付中的订单设置成订单失效 LambdaUpdateWrapperupdate = Wrappers.lambdaUpdate(); update.eq(ShopOrderDO::getId, orderId); update.eq(ShopOrderDO::getStatus, 1); update.set(ShopOrderDO::getStatus, 3); update.set(ShopOrderDO::getGmtModified, new Date()); update.set(ShopOrderDO::getModifier, "auto"); update.set(ShopOrderDO::getRemark, "过期了"); return ext.update(update); } }
模拟新增订单的接口
package com.example.order.core.controller; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.example.order.core.entity.ShopOrderDO; import com.example.order.core.service.ShopOrderOperateService; import com.example.order.core.service.ShopOrderService; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.MediaType; import org.springframework.web.bind.annotation.*; import java.util.Date; import java.util.List; @RestController @Api(tags = "-相关接口") @RequestMapping(value = "/shop/order", produces = MediaType.APPLICATION_JSON_VALUE) public class ShopOrderController { @Autowired private ShopOrderOperateService operateService; @Autowired private ShopOrderService ext; @GetMapping("list") @ApiOperation(value = "查询集合") public ListlistShopOrderServiceByPage(ShopOrderDO query) { LambdaQueryWrapper wrapper = Wrappers.lambdaQuery(query); return ext.list(wrapper); } @GetMapping("{id}") @ApiOperation(value = "获取某一实体") public ShopOrderDO getShopOrderServiceDetails(@PathVariable Long id) { return ext.getById(id); } @PostMapping @ApiOperation(value = "新增数据") public boolean saveShopOrderService(@RequestBody ShopOrderDO dto) { dto.setGmtCreate(new Date()); dto.setGmtModified(new Date()); dto.setCreator("初始化"); dto.setRemark("新建"); return operateService.save(dto); } @PutMapping("{id}") @ApiOperation(value = "修改数据") public boolean modifyShopOrderService(@RequestBody ShopOrderDO dto, @PathVariable Long id) { dto.setId(id); dto.setGmtModified(new Date()); dto.setModifier("更新"); dto.setRemark("手动更新"); return ext.updateById(dto); } }
PS: 消费者拿到消息之后,可以根据消息获取到订单信息,根据订单信息进行 *** 作是过期还是其他状态
当然,如果要确保订单消息一定不会丢失,还可以使用RabbitMQ的发送确认功能,这里略过不提
记录下以备后用
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)