redis + lua + rabbitmq实现高并发秒杀

redis + lua + rabbitmq实现高并发秒杀,第1张

mySeckill.lua

Lua优点:
减少网络开销:这个脚本只要执行一次,能减少网络传输
原子性:Redis将这个脚本作为原子执行要么全部成功或者失败,不担心并发问题,不需要事务,(PS:LUA脚本保证原子性,执行lua脚本时不会同时执行其它脚本或redis命令, 这种语义类似于MULTI/EXEC,这个lua脚本要么执行成功,要么执行失败
复用性:lua一旦执行就能永久性保存Redis的数据,可以供其它客户端使用 

-- 全局函数: 求阶乘 function factorial(n) if n == 1 then return 1 else return n * fact(n - 1) end end
--商品库存Key
local product_stock_key = KEYS[1]
--商品购买用户记录Key
local buyersKey = KEYS[2]
--用户ID
local uid = KEYS[3]

--校验用户是否重复秒杀
local result = redis.call("sadd" , buyersKey , uid )
if(tonumber(result) == 1)
then
    --初次秒杀
    local stock = redis.call("lpop" , product_stock_key )
    
    if(stock)
    then
        --库存>0
        return 1
    else
        --库存不足
        return -1
    end
else
    --重复秒杀
    return 2
end

方式一

 

SeckillController 

 PS:@MyAcessLimter(count = 1000,timeout = 1)

redis + lua限流 + AOP实现接口对客户端限流_Zxdwr520的博客-CSDN博客

@RestController
@RequestMapping("/seckill")
@Slf4j
public class SeckillController {
    @Autowired
    private ProductService productService;
    @Resource
    private RedisTemplate redisTemplate;
    
    @Autowired
    OrderProducer orderProducer;

    /**
     * 秒杀商品设置到Redis中
     * @author fan
     * @date 2022/5/9 17:56
     * @return java.util.List
    */
    @RequestMapping(value = "queryAll")
    @ResponseBody
    @MyAcessLimter(count = 1000,timeout = 1)
    public List queryAll(@RequestParam("seckillDate") String seckillDate) {
        DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
        Gson gson = new Gson();
        List productList = productService.queryAll();//
        if (productList == null){
            return null;
        }
        for (Product product: productList) {
            long productId = product.getId();
            redisTemplate.opsForValue().set("product_" + productId, gson.toJson(product));
            // 一个用户只买一件商品
            // 商品购买用户Set
            redisTemplate.opsForSet().add("product_buyers_" + product.getId(), "");
            for (int i = 0; i < product.getStock(); i++) {
                redisTemplate.opsForList().leftPush("product_stock_key_" + product.getId(), String.valueOf(i));
            }
            System.out.println(gson.toJson(product));
        }
        redisTemplate.opsForValue().set("seckill_plan_" + seckillDate, gson.toJson(productList));//把商品信息存入缓存,列表展示用

        return productList;
    }    


    /**
     * 开始秒杀商品
     * @author fan
     * @date 2022/5/10 1:03
     * @param userId
     * @param productId
     * @return java.lang.String
    */
    @RequestMapping(value = "seckillProduct")
    @ResponseBody
    @MyAcessLimter(count = 1000,timeout = 1)
    public String seckillProduct(@RequestParam("uid") long userId, @RequestParam("pid") long productId){
        List list = Lists.newArrayList("product_stock_key_" + productId, "product_buyers_" + productId, userId + "");
        Long code = redisTemplate.execute(defaultRedisScript, list, "");
        if (code == -1) {
            return "库存不足";
        } else if (code == 2) {
            return "不允许重复秒杀";
        } else if (code == 1) {//加入队列
            return orderProducer.createOrderProducer("fanoutQueueOrder",productId,userId);
        }
        return "error";
    }


}

 加入队列

@Component
@Slf4j
public class OrderProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private RedisTemplate redisTemplate;
    @Autowired
    private RedisClient redisClient;

    @Autowired
    private DefaultRedisScript defaultRedisScript;

    public String createOrderProducer(String queueName,long productId,long userId){
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("timestamp", System.currentTimeMillis());
        String messageId = String.valueOf(UUID.randomUUID());
        if (productId > 0 && userId > 0) {
            String productJson = (String) redisTemplate.opsForValue().get("product_" + productId);
            jsonObject.put("productJson", productJson);
            jsonObject.put("id", messageId);
            jsonObject.put("productId", productId + "");
            jsonObject.put("userId", userId + "");
            String jsonString = jsonObject.toJSONString();
            Message message = MessageBuilder.withBody(jsonString.getBytes())
                    .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
                    .setContentType(MessageProperties.CONTENT_TYPE_JSON)
                    .setDeliveryTag(System.currentTimeMillis())
                    .setContentEncoding("utf-8")
                    .setMessageId(messageId)
                    .build();
            rabbitTemplate.convertAndSend(queueName, message);
            return "success";
        }else {
            return "error";
        }
    }
}

 同步数据到DB

@Slf4j
@Component
public class OrderConsumer {

    @Autowired
    private RedisTemplate redisTemplate;
    @Autowired
    private RedisClient redisClient;

    @Autowired
    private OrderService orderService;
    @Autowired
    private ProductService productService;

    @RabbitListener(queues = "fanoutQueueOrder")
    public void createOrderConsumer(Message message, Channel channel) throws Exception {
        log.info("OrderConsumer  消费者收到消息:{}" , JSONObject.toJSONString(message));
        String messageId = message.getMessageProperties().getMessageId();
        String msg = new String(message.getBody(), "UTF-8");
        String messageIdRedis = //(String) redisTemplate.opsForValue().get("messageId");
                redisClient.getString("messageId");
        if (messageId != null) {//避免消费者消息重复
            if (!messageId.equals(messageIdRedis)) {
                redisClient.setString(messageId,messageId,3_000L * 600);// 写入缓存
                System.out.println("number==" + channel.getChannelNumber());
                JSONObject jsonObject = JSONObject.parseObject(msg);
                long deliverTag = message.getMessageProperties().getDeliveryTag();
                try {
                    //具体业务
                    String id = (String) jsonObject.get("id");
                    String productId = (String) jsonObject.get("productId");
                    String userId = (String) jsonObject.get("userId");
                    String productJson = (String) jsonObject.get("productJson");
                    if (productJson != null) {
                        Gson gson = new Gson();
                        Product product = gson.fromJson(productJson, Product.class);
                        Order order = new Order();
                        order.setProductId(Long.parseLong(productId));
                        order.setUserId(Long.parseLong(userId));
                        order.setId(id);
                        order.setOrderName("抢购" + product.getName());
                        order.setProductName(product.getName());
                        int p = productService.updateProduct(product.getId());//开启事务
                        if (p > 0) {
                            int i = orderService.insert(order);//开启事务
                            //loggerService.saveLog(jsonObject);//日志记录
                            if (i > 0 && p > 0) {
                                log.info("创建订单成功:i=" + i);
                                log.info("商品库存减-1成功:p=" + p);
                                LocalDateTime localDateTime = LocalDateTime.now();
                                Date dataTime = Date.from(localDateTime.atZone(ZoneId.systemDefault()).toInstant());
                                log.info("dataTime=" + dataTime);
                                channel.basicAck(deliverTag, true);//手动设置ack,消费成功,确认消息
                            }else {
                                channel.basicNack(deliverTag, false, true);
                            }
                        }
                    }

                }catch (Exception e){
                    try {
                        
                        channel.basicNack(deliverTag, false, true);
                    } catch (Exception ioException) {
                        log.error("重新放入队列失败,失败原因:{}",e.getMessage(),e);
                    }
                    log.error("TopicConsumer消费者出错,mq参数:{},错误信息:{}",message,e.getMessage(),e);

                    
                }
                //redisTemplate.opsForValue().set(messageId, messageId, 30, TimeUnit.SECONDS);//时间具体根据业务定

                System.out.println("消费消息jsonObject:" + jsonObject + ",messageId:" + messageId);
            }
        }

    }
}

方式二

LAU不变,开始秒杀时整个过程在缓存中进行,再从缓存中拿数据同步到数据库 

SeckillController
    /**
     * 秒杀商品设置到Redis中,返回给前端页面展示
     * @author fan
     * @date 2022/5/9 17:56
     * @return java.util.List
    */
    @RequestMapping(value = "queryAll")
    @ResponseBody
    @MyAcessLimter(count = 1000,timeout = 1)
    public List queryAll(@RequestParam("seckillDate") String seckillDate) {
        DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
        Gson gson = new Gson();
        List productList = productService.queryAll();//
        if (productList == null){
            return null;
        }
        for (Product product: productList) {
            long productId = product.getId();
            redisTemplate.opsForValue().set("product_" + productId, gson.toJson(product));
            // 一个用户只买一件商品
            // 商品购买用户Set
            redisTemplate.opsForSet().add("product_buyers_" + product.getId(), "");
            for (int i = 0; i < product.getStock(); i++) {
                redisTemplate.opsForList().leftPush("product_stock_key_" + product.getId(), String.valueOf(i));
            }
            System.out.println(gson.toJson(product));
        }
        redisTemplate.opsForValue().set("seckill_plan_" + seckillDate, gson.toJson(productList));//把商品信息存入缓存,列表展示用

        return productList;
    }


/**
     * 开始秒杀商品,并限流--- 这里秒杀时间的操作略
     * @author fan
     * @date 2022/5/10 1:03
     * @param userId
     * @param productId
     * @return java.lang.String
    */
    @RequestMapping(value = "seckillProduct")
    @ResponseBody
    @MyAcessLimter(count = 1000,timeout = 1)
    public String seckillProduct(@RequestParam("uid") long userId, @RequestParam("pid") long productId){
        List list = Lists.newArrayList("product_stock_key_" + productId, "product_buyers_" + productId,   "" + userId );
        Long code = redisTemplate.execute(defaultRedisScript, list, "");
        if (code == -1) {
            return "库存不足";
        } else if (code == 2) {
            return "不允许重复秒杀";
        } else if (code == 1) {//整个秒杀过程在缓存中进行,秒杀结束后从缓存中拿数据库加入队列同步到数据库中
            String productJson = redisTemplate.opsForValue().get("product_" + productId);
            Gson gson = new Gson();
            Product product = gson.fromJson(productJson, Product.class);
            Order order = new Order();
            order.setProductId(productId);
            order.setUserId(userId);
            String id = String.valueOf(UUID.randomUUID());
            order.setId(id);
            order.setOrderName("抢购" + product.getName());
            order.setProductName(product.getName());
            redisClient.setString("order_" + id, gson.toJson(order));
            return "sueccss";
        }
        return "error";
    }

    /**
     * 我们可以在夜深人静的时候做点事情,比如同步订单到数据库
     * @author fan
     * @date 2022/5/10 2:33
     * @return java.lang.String
    */
    @GetMapping(value = "createOrderProducer")
    @ResponseBody
    @Scheduled(cron = "0 0 0 * * ?")
    public String createOrderProducer(){
        Set orderKeys = redisTemplate.keys("order_*");
        for (String orderKey: orderKeys) {//从缓存拿订单数据加入队列
            if (orderKey != null) {
                String orderJson = redisClient.getString(orderKey);
                Gson gson = new Gson();
                Order order = gson.fromJson(orderJson, Order.class);//从缓存取出订单
                orderToRedisProducer.createOrderToRedisProducer("fanoutQueueOrder", order);
            }
        }
        return "sueccss";
    }

生产者

@Component
@Slf4j
public class OrderToRedisProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private RedisTemplate redisTemplate;
    @Autowired
    private RedisClient redisClient;

    @Autowired
    private DefaultRedisScript defaultRedisScript;
    

    public String createOrderToRedisProducer(String queueName, Order order){
        JSONObject jsonObject = new JSONObject();
        String messageId = String.valueOf(UUID.randomUUID());
        jsonObject.put("timestamp", System.currentTimeMillis());
        if (order != null) {
            String productJson = (String) redisTemplate.opsForValue().get("product_" + order.getProductId());
            jsonObject.put("productJson", productJson);
            jsonObject.put("id", order.getId());
            jsonObject.put("productId", order.getProductId() + "");
            jsonObject.put("userId", order.getUserId() + "");
            String jsonString = jsonObject.toJSONString();
            Message message = MessageBuilder.withBody(jsonString.getBytes())
                    .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
                    .setContentType(MessageProperties.CONTENT_TYPE_JSON)
                    .setDeliveryTag(System.currentTimeMillis())
                    .setContentEncoding("utf-8")
                    .setMessageId(messageId)
                    .build();
            rabbitTemplate.convertAndSend(queueName, message);
            return "success";
        }else {
            return "error";
        }
    }
}

@Slf4j
@Component
public class OrderToRedisConsumer {

    @Autowired
    private RedisClient redisClient;

    @Autowired
    private OrderService orderService;
    @Autowired
    private ProductService productService;

    @RabbitListener(queues = "fanoutQueueOrder")
    public void createOrderConsumer(Message message, Channel channel) throws Exception {
        log.info("OrderConsumer  消费者收到消息:{}" , JSONObject.toJSONString(message));
        String messageId = message.getMessageProperties().getMessageId();
        String msg = new String(message.getBody(), "UTF-8");
        String messageIdRedis = //(String) redisTemplate.opsForValue().get("messageId");
                redisClient.getString("messageId");
        if (messageId != null) {//避免消费者消息重复
            if (!messageId.equals(messageIdRedis)) {
                redisClient.setString(messageId,messageId,3_000L * 600);// 写入缓存
                System.out.println("number==" + channel.getChannelNumber());
                JSONObject jsonObject = JSONObject.parseObject(msg);
                long deliverTag = message.getMessageProperties().getDeliveryTag();
                try {
                    //具体业务
                    String id = (String) jsonObject.get("id");
                    String productId = (String) jsonObject.get("productId");
                    String userId = (String) jsonObject.get("userId");
                    String productJson = (String) jsonObject.get("productJson");
                    if (productJson != null) {
                        Gson gson = new Gson();
                        Product product = gson.fromJson(productJson, Product.class);
                        Order order = new Order();
                        order.setProductId(Long.parseLong(productId));
                        order.setUserId(Long.parseLong(userId));
                        order.setId(id);
                        order.setOrderName("抢购" + product.getName());
                        order.setProductName(product.getName());
                        int p = productService.updateProduct(product.getId());//开启事务
                        if (p > 0) {
                            int i = orderService.insert(order);//开启事务
                            //loggerService.saveLog(jsonObject);//日志记录
                            if (i > 0 && p > 0) {
                                log.info("创建订单成功:i=" + i);
                                log.info("商品库存减-1成功:p=" + p);
                                LocalDateTime localDateTime = LocalDateTime.now();
                                Date dataTime = Date.from(localDateTime.atZone(ZoneId.systemDefault()).toInstant());
                                log.info("dataTime=" + dataTime);
                                channel.basicAck(deliverTag, true);//手动设置ack,消费成功,确认消息
                            }else {
                                channel.basicNack(deliverTag, false, true);
                            }
                        }
                    }

                }catch (Exception e){
                    try {
                       
                        channel.basicNack(deliverTag, false, true);
                    } catch (Exception ioException) {
                        log.error("重新放入队列失败,失败原因:{}",e.getMessage(),e);
                    }
                    log.error("TopicConsumer消费者出错,mq参数:{},错误信息:{}",message,e.getMessage(),e);

                 
                }
                

                System.out.println("消费消息jsonObject:" + jsonObject + ",messageId:" + messageId);
            }
        }
    }
}

 效果图:

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/904967.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-15
下一篇 2022-05-15

发表评论

登录后才能评论

评论列表(0条)

保存