要注意和自己的rabbitmq的版本对应起来
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
我的mq是docker安装的3.9.7的
下载完之后把插件copy到mq的plugin目录下,然后启用插件。之后重启容器,我这里是docker-compose安装的
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
docker-compose restart
在Type里面查看是否有x-delayed-message选项,如果存在就代表插件安装成功。
yaml配置mq,然后在mq管理页面创建虚拟host:fchan
spring: rabbitmq: host: 110.40.181.73 port: 35672 username: root password: 10086 virtual-host: /fchan
配置延时队列和延时交换机的绑定
package com.fchan.mq.mqDelay; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.CustomExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class MqDelayConfig { //最后经过死信队列转发后实际消费的交换机 private static final String EXCHANGE_NAME = "delayed_exchange"; //最后经过死信队列转发后实际消费的队列 private static final String QUEUE_NAME = "delayed_queue"; //最后经过死信队列转发后实际消费的路由key private static final String ROUTE_KEY = "delayed_key"; @Bean CustomExchange exchange() { //通过x-delayed-type参数设置fanout /direct / topic / header 类型 Mapargs = new HashMap<>(); args.put("x-delayed-type", "direct"); return new CustomExchange(EXCHANGE_NAME, "x-delayed-message",true, false,args); } @Bean public Queue queue() { return new Queue(QUEUE_NAME,true,false,false); } @Bean public Binding binding(CustomExchange exchange, Queue queue) { return BindingBuilder .bind(queue) .to(exchange) .with(ROUTE_KEY) .noargs(); } }
消息生产者
package com.fchan.mq.mqDelay; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.MessageDeliveryMode; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class MyRabbitSender { Logger log = LoggerFactory.getLogger(MyRabbitSender.class); private static final String ROUTE_KEY = "delayed_key"; private static final String EXCHANGE_NAME = "delayed_exchange"; @Autowired private RabbitTemplate rabbitTemplate; public void send2(String msg, int delay) { log.info("RabbitSender.send() msg = {}", msg); rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTE_KEY, msg, message -> { message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); //消息持久化 message.getMessageProperties().setDelay(delay * 1000); // 单位为毫秒 return message; }); } }
消息消费者
package com.fchan.mq.mqDelay; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class MyRabbitConsume { Logger log = LoggerFactory.getLogger(MyRabbitConsume.class); @RabbitListener(queues = "delayed_queue") public void infoConsumption(String data) throws Exception { log.info("收到信息:{}",data); log.info("然后进行一系列逻辑处理 Thanks♪(・ω・)ノ"); } }
参考了大佬的博文
https://juejin.cn/post/6977516798828609567#heading-13
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)