RabbitMQ通过DelayExchange实现延时队列

RabbitMQ通过DelayExchange实现延时队列,第1张

RabbitMQ通过DelayExchange实现延时队列

目录

1、DelayExchange插件配置

1.1、下载DelayExchange插件1.2、安装DelayExchange插件1.3、启动DelayExchange插件 2、使用原理3、使用说明4、完整代码5、测试效果

1、DelayExchange插件配置 1.1、下载DelayExchange插件

地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

选择对应版本进行下载,比如:rabbitmq_delayed_message_exchange-3.8.17.8f537ac.ez

1.2、安装DelayExchange插件

将下载的文件拷贝到服务器,比如root目录下;

docker环境 *** 作:

docker搭建时,拷贝到容器的/opt/rabbitmq/plugins/中
docker cp rabbitmq_delayed_message_exchange-3.9.0.ez rabbitmq:/opt/rabbitmq/plugins/

linux环境 *** 作:

# 如果不是docker搭建,则拷贝到/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.27/plugins/目录下
cp rabbitmq_delayed_message_exchange-3.9.0.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.27/plugins
1.3、启动DelayExchange插件

docker环境 *** 作:

docker exec -it rabbitmq bash

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

linux环境 *** 作:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

2、使用原理

DelayExchange需要将一个交换机声明为delayed类型。当我们发送消息到delayExchange时,流程如下:

接收消息判断消息是否具备x-delay属性如果有x-delay属性,说明是延迟消息,持久化到硬盘,读取x-delay值,作为延迟时间返回routing not found结果给消息发送者x-delay时间到期后,重新投递消息到指定队列 3、使用说明

声明一个交换机,交换机的类型可以是任意类型,通过@RabbitListener注解绑定交换机并且设定delayed属性为true即可,如下:

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "queue_delay"),
            exchange = @Exchange(name = "exchange_delay", delayed = "true"), key = "routing_delay"))

发送消息时,通过MessageBuilder组织消息,并且设置设置header的key为x-delayvalue为TTL,TTL单位为毫秒,如下:

Message message = MessageBuilder.withBody(msg.getBytes(StandardCharsets.UTF_8)).setHeader("x-delay", 1000).build();
4、完整代码

Controller:

    
    @PostMapping("/sendDelay")
    public String sendDelay() {
        String msg = "发送时间:" + LocalDateTime.now() + ",延时:";
        for (int i = 1; i <= 10; i++) {
            String sendMsg = msg + i + "秒";
            long time = i * 1000;
            rabbitTemplate.convertAndSend(RabbitmqContext.EXCHANGE_DELAY, RabbitmqContext.ROUTING_DELAY, sendMsg, message -> {
                //注意这里时间可以使long,而且是设置header
                message.getMessageProperties().setHeader("x-delay", time);
                return message;
            });
        }
        return "发送成功...";
    }

Config:

@Configuration
public class DelayConfig {


    
    @Bean
    public Queue delayQueue() {
        return new Queue(RabbitmqContext.QUEUE_DELAY);
    }

    
    @Bean
    public CustomExchange delayExchange() {
        Map args = new HashMap<>(2);
        args.put("x-delayed-type", "direct");
        // x-delayed-message 固定写法
        return new CustomExchange(RabbitmqContext.EXCHANGE_DELAY, "x-delayed-message", true, false, args);
    }

    @Bean
    public Binding delayBinding() {
        return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(RabbitmqContext.ROUTING_DELAY).noargs();
    }
}

Listener:

@Component
@Slf4j
public class DelayReceiveListener {

    
    @RabbitListener(queues = "delay_queue")
    public void receiveQueue(String msg, Channel channel, Message message) {
        System.out.println("这个是延时消息:" + msg + ",当前时间:" + LocalDateTime.now());
    }
}
5、测试效果


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5705606.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存