RabbitMq延迟消费(TTL实现)

RabbitMq延迟消费(TTL实现),第1张

RabbitMq延迟消费(TTL实现) 延时消费

RabbitMQ本身并不提供延迟队列的功能,但是我们仍然可以使用RabbitMQ的 TTL(Time-To-Live) 和 DLX(Dead Letter Exchanges) 这两个扩展特性来实现延迟队列,实现消息的延迟消费和延迟重试的功能。

实现结果
  • 固定时间延迟消费

  • 指定时间消费

具体实现 连接配置
package com.itdfq.delay.config;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class RabbitConfig {

    @Value("${spring.rabbitmq.addresses}")
    private String address;
    @Value("${spring.rabbitmq.port}")
    private String port;
    @Value("${spring.rabbitmq.username}")
    private String username;
    @Value("${spring.rabbitmq.password}")
    private String password;
    @Value("${spring.rabbitmq.virtual-host}")
    private String virtualHost;


    //连接工厂
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses(address + ":" + port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualHost);
        //TODO  消息发送确认--回调
//        connectionFactory.setPublisherConfirms(true);
        return connectionFactory;

    }

    //RabbitAdmin类封装对RabbitMQ的管理 *** 作
    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }

    //使用Template
    @Bean
    public RabbitTemplate newRabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        //设置监听确认mq(交换器)接受到信息
        rabbitTemplate.set/confirm/iCallback(/confirm/iCallback());
        //添加监听 失败鉴定(路由没有收到)
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback(returnCallback());
        return rabbitTemplate;
    }


    /
    @Bean
    public Queue DelayQueue() {
        Map params = new HashMap<>();
        // x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称,
        params.put("x-dead-letter-exchange", RabbitMqConstant.IMMEDIATE_EXCHANGE);
        // x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。
        params.put("x-dead-letter-routing-key", RabbitMqConstant.IMMEDIATE_ROUTING_KEY);
        // x-message-ttl 声明该队列死信可存活时间
        params.put("x-message-ttl", RabbitMqConstant.DELAY_TIME);
        return new Queue(RabbitMqConstant.DELAY_QUEUE, true, false, false, params);
    }
设置立即消费监听
package com.itdfq.delay.message.listen;

import com.itdfq.delay.constant.RabbitMqConstant;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class DelayListenConfig {


    @Autowired
    private RabbitAdmin rabbitAdmin;

    @Autowired
    private ConnectionFactory connectionFactory;


    

    @Bean
    public DirectExchange Exchange() {
        DirectExchange exchange = new DirectExchange(
                RabbitMqConstant.IMMEDIATE_EXCHANGE, true, false);
        exchange.setAdminsThatShouldDeclare(rabbitAdmin);
        return exchange;
    }

    @Bean
    public Queue Queue() {
        Queue queue = new Queue(RabbitMqConstant.IMMEDIATE_QUEUE, true, false, false);
        queue.setAdminsThatShouldDeclare(rabbitAdmin);
        return queue;
    }

    @Bean
    public Binding subscribeNotifyBinding() {
        Binding binding = BindingBuilder.bind(Queue()).to(Exchange())
                .with(RabbitMqConstant.IMMEDIATE_ROUTING_KEY);
        binding.setAdminsThatShouldDeclare(rabbitAdmin);
        return binding;
    }


    @Bean
    public SimpleMessageListenerContainer container(
            @Qualifier(value = "delayRabbitmqListener") DelayRabbitmqListener delayRabbitmqListener) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueues(Queue());
        container.setMessageListener(delayRabbitmqListener);
        container.setDefaultRequeueRejected(false);
        //手动提交
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        //设置消费者ack消息的模式,默认是自动,此处设置为手动
//        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return container;
    }

}


指定时间(延时消费) 设置延时队列
   
    @Bean
    public Queue variableDelayQueue() {
        Map params = new HashMap<>();
        // x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称,
        params.put("x-dead-letter-exchange", RabbitMqConstant.IMMEDIATE_EXCHANGE);
        // x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。
        params.put("x-dead-letter-routing-key", RabbitMqConstant.IMMEDIATE_ROUTING_KEY);
        return new Queue(RabbitMqConstant.DELAY_VARIABLE_QUEUE_KEY, true, false, false, params);
    }
消费者
 
    public void send(String msg,Integer expiration){
        rabbitTemplate.convertAndSend(RabbitMqConstant.DELAY_VARIABLE_EXCHANGE_KEY,
                RabbitMqConstant.DELAY_VARIABLE_ROUTING_KEY, msg,
                message -> {
                    log.info("可变延时消费发送消息: {}, and expiration in {}ms", msg, expiration);
                    message.getMessageProperties().setExpiration(expiration.toString());
                    return message;
                });
    }

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/4685314.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-07
下一篇 2022-11-07

发表评论

登录后才能评论

评论列表(0条)

保存