rabbitmq安装延时队列插件实现延时队列

rabbitmq安装延时队列插件实现延时队列,第1张

rabbitmq安装延时队列插件实现延时队列 下载插件地址

要注意和自己的rabbitmq的版本对应起来
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

我的mq是docker安装的3.9.7的

下载完之后把插件copy到mq的plugin目录下,然后启用插件。之后重启容器,我这里是docker-compose安装的

rabbitmq-plugins enable rabbitmq_delayed_message_exchange
docker-compose restart


进入rabbitmq管理页面查看插件是否安装成功

在Type里面查看是否有x-delayed-message选项,如果存在就代表插件安装成功。

使用mq延时队列插件下springboot实现延时队列

yaml配置mq,然后在mq管理页面创建虚拟host:fchan

spring:
  rabbitmq:
    host: 110.40.181.73
    port: 35672
    username: root
    password: 10086
    virtual-host: /fchan

配置延时队列和延时交换机的绑定

package com.fchan.mq.mqDelay;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class MqDelayConfig {

    //最后经过死信队列转发后实际消费的交换机
    private static final String EXCHANGE_NAME = "delayed_exchange";
    //最后经过死信队列转发后实际消费的队列
    private static final String QUEUE_NAME = "delayed_queue";
    //最后经过死信队列转发后实际消费的路由key
    private static final String ROUTE_KEY = "delayed_key";

    
    @Bean
    CustomExchange exchange() {
        //通过x-delayed-type参数设置fanout /direct / topic / header 类型
        Map args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(EXCHANGE_NAME, "x-delayed-message",true, false,args);
    }

    
    @Bean
    public Queue queue() {
        return new Queue(QUEUE_NAME,true,false,false);
    }

    
    @Bean
    public Binding binding(CustomExchange exchange, Queue queue) {
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with(ROUTE_KEY)
                .noargs();
    }
}

消息生产者

package com.fchan.mq.mqDelay;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;


@Component
public class MyRabbitSender {

    Logger log = LoggerFactory.getLogger(MyRabbitSender.class);

    private static final String ROUTE_KEY = "delayed_key";
    private static final String EXCHANGE_NAME = "delayed_exchange";

    @Autowired
    private RabbitTemplate rabbitTemplate;

    
    public void send2(String msg, int delay) {
        log.info("RabbitSender.send() msg = {}", msg);
        rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTE_KEY, msg, message -> {
            message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);  //消息持久化
            message.getMessageProperties().setDelay(delay * 1000);   // 单位为毫秒
            return message;
        });
    }
}

消息消费者

package com.fchan.mq.mqDelay;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MyRabbitConsume {
    Logger log = LoggerFactory.getLogger(MyRabbitConsume.class);

    @RabbitListener(queues = "delayed_queue")
    public void infoConsumption(String data) throws Exception {
        log.info("收到信息:{}",data);
        log.info("然后进行一系列逻辑处理 Thanks♪(・ω・)ノ");
    }
}

参考了大佬的博文
https://juejin.cn/post/6977516798828609567#heading-13

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/4658846.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-06
下一篇 2022-11-06

发表评论

登录后才能评论

评论列表(0条)

保存