RabbitMQ本身并不提供延迟队列的功能,但是我们仍然可以使用RabbitMQ的 TTL(Time-To-Live) 和 DLX(Dead Letter Exchanges) 这两个扩展特性来实现延迟队列,实现消息的延迟消费和延迟重试的功能。
实现结果-
固定时间延迟消费
-
指定时间消费
package com.itdfq.delay.config; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig { @Value("${spring.rabbitmq.addresses}") private String address; @Value("${spring.rabbitmq.port}") private String port; @Value("${spring.rabbitmq.username}") private String username; @Value("${spring.rabbitmq.password}") private String password; @Value("${spring.rabbitmq.virtual-host}") private String virtualHost; //连接工厂 @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setAddresses(address + ":" + port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(virtualHost); //TODO 消息发送确认--回调 // connectionFactory.setPublisherConfirms(true); return connectionFactory; } //RabbitAdmin类封装对RabbitMQ的管理 *** 作 @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { return new RabbitAdmin(connectionFactory); } //使用Template @Bean public RabbitTemplate newRabbitTemplate() { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory()); //设置监听确认mq(交换器)接受到信息 rabbitTemplate.set/confirm/iCallback(/confirm/iCallback()); //添加监听 失败鉴定(路由没有收到) rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnCallback(returnCallback()); return rabbitTemplate; } / @Bean public Queue DelayQueue() { Map设置立即消费监听params = new HashMap<>(); // x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称, params.put("x-dead-letter-exchange", RabbitMqConstant.IMMEDIATE_EXCHANGE); // x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。 params.put("x-dead-letter-routing-key", RabbitMqConstant.IMMEDIATE_ROUTING_KEY); // x-message-ttl 声明该队列死信可存活时间 params.put("x-message-ttl", RabbitMqConstant.DELAY_TIME); return new Queue(RabbitMqConstant.DELAY_QUEUE, true, false, false, params); }
package com.itdfq.delay.message.listen; import com.itdfq.delay.constant.RabbitMqConstant; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class DelayListenConfig { @Autowired private RabbitAdmin rabbitAdmin; @Autowired private ConnectionFactory connectionFactory; @Bean public DirectExchange Exchange() { DirectExchange exchange = new DirectExchange( RabbitMqConstant.IMMEDIATE_EXCHANGE, true, false); exchange.setAdminsThatShouldDeclare(rabbitAdmin); return exchange; } @Bean public Queue Queue() { Queue queue = new Queue(RabbitMqConstant.IMMEDIATE_QUEUE, true, false, false); queue.setAdminsThatShouldDeclare(rabbitAdmin); return queue; } @Bean public Binding subscribeNotifyBinding() { Binding binding = BindingBuilder.bind(Queue()).to(Exchange()) .with(RabbitMqConstant.IMMEDIATE_ROUTING_KEY); binding.setAdminsThatShouldDeclare(rabbitAdmin); return binding; } @Bean public SimpleMessageListenerContainer container( @Qualifier(value = "delayRabbitmqListener") DelayRabbitmqListener delayRabbitmqListener) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueues(Queue()); container.setMessageListener(delayRabbitmqListener); container.setDefaultRequeueRejected(false); //手动提交 container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置消费者ack消息的模式,默认是自动,此处设置为手动 // container.setAcknowledgeMode(AcknowledgeMode.MANUAL); return container; } }
指定时间(延时消费) 设置延时队列
@Bean public Queue variableDelayQueue() { Map消费者params = new HashMap<>(); // x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称, params.put("x-dead-letter-exchange", RabbitMqConstant.IMMEDIATE_EXCHANGE); // x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。 params.put("x-dead-letter-routing-key", RabbitMqConstant.IMMEDIATE_ROUTING_KEY); return new Queue(RabbitMqConstant.DELAY_VARIABLE_QUEUE_KEY, true, false, false, params); }
public void send(String msg,Integer expiration){ rabbitTemplate.convertAndSend(RabbitMqConstant.DELAY_VARIABLE_EXCHANGE_KEY, RabbitMqConstant.DELAY_VARIABLE_ROUTING_KEY, msg, message -> { log.info("可变延时消费发送消息: {}, and expiration in {}ms", msg, expiration); message.getMessageProperties().setExpiration(expiration.toString()); return message; }); }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)