基于RabbitMQ实现的订单超时功能-记录备查

基于RabbitMQ实现的订单超时功能-记录备查,第1张

基于RabbitMQ实现的订单超时功能-记录备查
背景

做过电商系统的人都会遇到一个场景,就是下了订单之后,订单支付会有一个有效期,超时订单自动关闭。实现的技术有很多,再次讨论基于RabbitMQ进行实现

 思路

这个是基于RabbitMQ的延迟队列实现的,那需要讨论下什么是延迟队列

延迟队列

延迟队列存储的对象是对应的延迟消息,所谓“延迟消息”是指当消息被发送以后,并不
想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费 。 

PS: 在 AMQP 协议中,或者 RabbitMQ 本身没有直接支持延迟队列的功能,但是可以通过 DLX(死信队列) 和 TTL 模拟出延迟队列的功能,所以需要讨论下什么是死信队列

 死信队列

DLX,全称为 Dead-Letter-Exchange,可以称之为死信交换器,也有人称之为死信邮箱。当
消息在一个队列中变成死信( dead message )之后,它能被重新被发送到另一个交换器中,这个交换器就是 DLX,绑定 DLX 的队列就称之为死信队列。

 消息一般在一下情况下会转换成死信队列:

  • 消息被拒绝( Basic.Reject/Basic.Nack ),井且设置 requeue 参数为 false;
  • 消息过期(TTL);
  • 队列达到最大长度。
     

我们这里采用消息过期的方式进行分析实现

PS: 建立两个队列(可以是不同交换器),将队列A中信息设置成一个过期时间,当消息过期之后,会自动投递到队列B,那么监听队列B的消费根据再进行接下来的业务处理

 实现

maven配置 

     
            org.springframework.boot
            spring-boot-starter-actuator
        
        
            org.springframework.boot
            spring-boot-starter-web
          
        
            com.baomidou
            mybatis-plus-boot-starter
            3.3.1.tmp
        
        
            com.github.xiaoymin
            knife4j-spring-boot-starter
            2.0.1
            
                
                    guava
                    com.google.guava
                
            
        
        
            com.google.guava
            guava
            20.0
        
        
            org.springframework.boot
            spring-boot-starter-amqp
        
        
            org.springframework.amqp
            spring-rabbit-test
            test
        
        
            mysql
            mysql-connector-java
            5.1.29
        
     
        
            org.projectlombok
            lombok
            true
        

 基于Springboot的RabbitMQ配置

spring:
  datasource:
    driver-class-name: com.mysql.jdbc.Driver
    url: jdbc:mysql://192.168.116.128:3306/store?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai
    username: root
    password: root
    hikari:
      pool-name: coreHikariPool
      maximum-pool-size: 12
      connection-timeout: 30000
      minimum-idle: 10
      idle-timeout: 500000
      max-lifetime: 540000
      connection-test-query: SELECT 1
      auto-commit: true
  rabbitmq:
    addresses: 192.168.116.143:5672,192.168.116.144:5672,192.168.116.145:5672
    username: admin
    password: admin$

 配置启动时候创建的队列

package com.example.order.core.config;


public enum QueueEnum {

    
    QUEUE_AUTO_CANCEL("vv_x", "zz_test", "key"),
    
    QUEUE_DELAY_ORDER("vv_x_ttl", "zz_test_ttl", "key_ttl");

    QueueEnum(String exchange, String queue, String routeKey) {
        this.exchange = exchange;
        this.queue = queue;
        this.routeKey = routeKey;
    }

    public String getExchange() {
        return exchange;
    }

    public String getQueue() {
        return queue;
    }

    public String getRouteKey() {
        return routeKey;
    }

    private String exchange;
    public String queue;
    private String routeKey;
}
package com.example.order.core.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class RabbitMqConfig {

    
    @Bean
    DirectExchange orderDirect() {
        return (DirectExchange) ExchangeBuilder
                .directExchange(QueueEnum.QUEUE_AUTO_CANCEL.getExchange())
                .durable(true)
                .build();
    }

    
    @Bean
    DirectExchange orderTtlDirect() {
        return (DirectExchange) ExchangeBuilder
                .directExchange(QueueEnum.QUEUE_DELAY_ORDER.getExchange())
                .durable(true)
                .build();
    }

    
    @Bean
    public Queue orderQueue() {
        return new Queue(QueueEnum.QUEUE_AUTO_CANCEL.getQueue());
    }

    
    @Bean
    public Queue orderTtlQueue() {
        return QueueBuilder
                .durable(QueueEnum.QUEUE_DELAY_ORDER.getQueue())
                //到期后转发的交换机
                .withArgument("x-dead-letter-exchange", QueueEnum.QUEUE_AUTO_CANCEL.getExchange())
                //到期后转发的路由键
                .withArgument("x-dead-letter-routing-key", QueueEnum.QUEUE_AUTO_CANCEL.getRouteKey())
                .build();
    }

    
    @Bean
    Binding orderBinding(DirectExchange orderDirect, Queue orderQueue) {
        return BindingBuilder
                .bind(orderQueue)
                .to(orderDirect)
                .with(QueueEnum.QUEUE_AUTO_CANCEL.getRouteKey());
    }

    
    @Bean
    Binding orderTtlBinding(DirectExchange orderTtlDirect, Queue orderTtlQueue) {
        return BindingBuilder
                .bind(orderTtlQueue)
                .to(orderTtlDirect)
                .with(QueueEnum.QUEUE_DELAY_ORDER.getRouteKey());
    }
}

消息生产者

package com.example.order.core.listener;

import com.example.order.core.config.QueueEnum;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;


@Slf4j
@Component
public class OrderMessageSender {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    
    public void sendMessage(Long orderId, final long delayTimes) {
        //给延迟队列发送消息
        rabbitTemplate.convertAndSend(QueueEnum.QUEUE_DELAY_ORDER.getExchange(), QueueEnum.QUEUE_DELAY_ORDER.getRouteKey()
                , orderId, message -> {
                    //给消息设置延迟毫秒值
                    message.getMessageProperties().setExpiration(String.valueOf(delayTimes));
                    return message;
                });
        log.info("send delay message orderId:{}", orderId);
    }
}

消息消费者 

package com.example.order.core.listener;

import com.example.order.core.service.ShopOrderOperateService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;


@Slf4j
@Component
public class AutoCancelListener {
    @Autowired
    private ShopOrderOperateService operateService;

    @RabbitListener(queues = "zz_test")
    @RabbitHandler
    public void handle(Long orderId) {
        boolean b = operateService.cancelOrder(orderId);
        log.info("receive delay message orderId:{},and auto cancel flag:{}", orderId, b);
    }
}

 真正实现自动取消订单的实现类及所有依赖

 

package com.example.order.core.service;


import com.example.order.core.entity.ShopOrderDO;


public interface ShopOrderOperateService {
    
    boolean save(ShopOrderDO dto);

    
    boolean cancelOrder(Long orderId);
}

 

package com.example.order.core.entity;

import com.baomidou.mybatisplus.annotation.TableName;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;


@Data
@TableName("shop_order")
@ApiModel(value = "实体")
public class ShopOrderDO extends baseDO {
    
    private static final long serialVersionUID = -8985653170140721455L;

    @ApiModelProperty(value = "订单状态", notes = "1:待支付,2:支付成功,3:订单超时 4:订单支付失败")
    private Integer status;

    @ApiModelProperty(value = "重要成都", notes = "")
    private Integer importance;

    @ApiModelProperty(value = "备注", notes = "")
    private String remark;

    @Override
    public String toString() {
        return "ShopOrderDO{" +
                " id=" + id +
                ", status=" + status +
                '}' + "过了:" + (System.currentTimeMillis() - this.getGmtCreate().getTime()) / 1000 + "s";
    }
}

 //ShopOrderMapper.xml




    
    
        
        
        
        
        
        
        
        
        
    

    
    
        id,
        gmt_create,
        creator,
        gmt_modified,
        modifier,
        status, is_deleted, importance,remark
    

package com.example.order.core.mapper;

import com.baomidou.mybatisplus.core.mapper.baseMapper;
import com.example.order.core.entity.ShopOrderDO;


public interface ShopOrderMapper extends baseMapper {

}
package com.example.order.core.service;

import com.baomidou.mybatisplus.extension.service.IService;
import com.example.order.core.entity.ShopOrderDO;


public interface ShopOrderService extends IService {

}
package com.example.order.core.service.impl;

import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.example.order.core.entity.ShopOrderDO;
import com.example.order.core.mapper.ShopOrderMapper;
import com.example.order.core.service.ShopOrderService;
import org.springframework.stereotype.Service;


@Service
public class ShopOrderServiceImpl extends ServiceImpl implements ShopOrderService {

}
package com.example.order.core.service;


import com.example.order.core.entity.ShopOrderDO;


public interface ShopOrderOperateService {
    
    boolean save(ShopOrderDO dto);

    
    boolean cancelOrder(Long orderId);
}
package com.example.order.core.service.impl;

import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.example.order.core.entity.ShopOrderDO;
import com.example.order.core.listener.OrderMessageSender;
import com.example.order.core.service.ShopOrderOperateService;
import com.example.order.core.service.ShopOrderService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.util.Date;


@Slf4j
@Service
public class ShopOrderOperateServiceImpl implements ShopOrderOperateService {
    @Resource
    OrderMessageSender sender;
    @Autowired
    private ShopOrderService ext;

    @Override
    public boolean save(ShopOrderDO dto) {
        try {
            dto.setStatus(1);
            boolean save = ext.save(dto);
            if (save) {
                sender.sendMessage(dto.getId(), 1 * 60 * 1000);
            }
            return true;
        } catch (Exception e) {
            log.error(".....{}", e);
        }
        return false;
    }

    @Override
    public boolean cancelOrder(Long orderId) {
        //把待支付中的订单设置成订单失效
        LambdaUpdateWrapper update = Wrappers.lambdaUpdate();
        update.eq(ShopOrderDO::getId, orderId);
        update.eq(ShopOrderDO::getStatus, 1);
        update.set(ShopOrderDO::getStatus, 3);
        update.set(ShopOrderDO::getGmtModified, new Date());
        update.set(ShopOrderDO::getModifier, "auto");
        update.set(ShopOrderDO::getRemark, "过期了");
        return ext.update(update);
    }
}

模拟新增订单的接口

package com.example.order.core.controller;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.example.order.core.entity.ShopOrderDO;
import com.example.order.core.service.ShopOrderOperateService;
import com.example.order.core.service.ShopOrderService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;

import java.util.Date;
import java.util.List;


@RestController
@Api(tags = "-相关接口")
@RequestMapping(value = "/shop/order", produces = MediaType.APPLICATION_JSON_VALUE)
public class ShopOrderController {

    @Autowired
    private ShopOrderOperateService operateService;


    @Autowired
    private ShopOrderService ext;

    @GetMapping("list")
    @ApiOperation(value = "查询集合")
    public List listShopOrderServiceByPage(ShopOrderDO query) {
        LambdaQueryWrapper wrapper = Wrappers.lambdaQuery(query);
        return ext.list(wrapper);
    }

    @GetMapping("{id}")
    @ApiOperation(value = "获取某一实体")
    public ShopOrderDO getShopOrderServiceDetails(@PathVariable Long id) {
        return ext.getById(id);
    }

    @PostMapping
    @ApiOperation(value = "新增数据")
    public boolean saveShopOrderService(@RequestBody ShopOrderDO dto) {
        dto.setGmtCreate(new Date());
        dto.setGmtModified(new Date());
        dto.setCreator("初始化");
        dto.setRemark("新建");
        return operateService.save(dto);
    }

    @PutMapping("{id}")
    @ApiOperation(value = "修改数据")
    public boolean modifyShopOrderService(@RequestBody ShopOrderDO dto, @PathVariable Long id) {
        dto.setId(id);
        dto.setGmtModified(new Date());
        dto.setModifier("更新");
        dto.setRemark("手动更新");
        return ext.updateById(dto);
    }
}

 PS: 消费者拿到消息之后,可以根据消息获取到订单信息,根据订单信息进行 *** 作是过期还是其他状态

当然,如果要确保订单消息一定不会丢失,还可以使用RabbitMQ的发送确认功能,这里略过不提

记录下以备后用

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5665023.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存