Rabbitmq基于插件的延迟队列

Rabbitmq基于插件的延迟队列,第1张

Rabbitmq基于插件的延迟队列 Rabbitmq插件优化死信队列

之前基于死信书写的延迟消息例子中,消费者并没有首先消费延迟时间较短的队列
因为RabbitMQ只会检查第一个消息是否过期,如果过期则丢到死信队列,
如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行

要想解决上述小bug,需要用到rabbitmq的一个插件rabbitmq_message_timestamp-3.8.0.ez
首先去rabbitmq官网下载该插件
下载完之后将该插件拷贝到rabbitmq的安装路径下的plugins路径下:

输入拷贝命令: cp rabbitmq_delayed_message_exchange-3.8.0.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins
进入到刚才拷贝的路径: cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins
执行安装命令:rabbitmq-plugins enable rabbitmq_delayed_message_exchange
启动rabbitmq
然后进入rabbitmq网页管理中,在新建交换机的类型下拉框中会发现新增了x-delayed-message选项

书写延迟配置类代码:

Configuration
public class DelayedQueueConfig {
    //队列
    public static final String DELAYED_QUEUE_NAME = "delayed.queue";
    //交换机
    public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
    //routingkey
    public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";

    //声明队列
    @Bean
    public Queue delayedQueue() {
        return new Queue(DELAYED_QUEUE_NAME);
    }
    //声明基于插件的自定义交换机
    @Bean
    public CustomExchange delayedExchange(){
        Map arguments = new HashMap<>();
        arguments.put("x-delayed-type","direct");
        
        return new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",
                true,false,arguments);
    }

    //绑定交换机与队列的关系
    @Bean
    public Binding delayedQueueBindingDelayedExchange(
            @Qualifier("delayedQueue") Queue delayedQueue,
            @Qualifier("delayedExchange") CustomExchange delayedExchange){
        return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
    }
}

在生产者中添加基于插件的延迟消息发送方法:

//开始发消息 基于插件的延迟消息
    @GetMapping("/sendExpirationMsgs/{message}/{delayTime}")
    public void sendMsg(@PathVariable String message,@PathVariable Integer delayTime){
        log.info("当前时间:{},发送一条时长{}毫秒delay信息给延迟队列delayed.queue:{}",
                new Date().toString(),delayTime,message);
        rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME,DelayedQueueConfig.DELAYED_ROUTING_KEY,message, msg->{
            //发送消息的时候 延迟时长 单位ms
            msg.getMessageProperties().setDelay(delayTime);
            return msg;
        });
    }

书写基于插件延迟消息消费者代码:

@Component
@Slf4j
public class DelayConsumer {
    //监听消息
    @RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME)
    public void receiveDelayQueue (Message message){
        String msg = new String(message.getBody());
        log.info("当前时间:{},收到延迟队列的消息:{}",new Date().toString(),msg);
    }
}

启动主启动类,然后在浏览器中发送两条消息
其中第一条消息的延迟时间较长,第二条消息的延迟时间较短,然后对比结果看看和以前写的死信队列的方式有什么不一样
http://localhost:8080/ttl/sendExpirationMsgs/你好2/25000 延迟时间25s
http://localhost:8080/ttl/sendExpirationMsgs/你好/5000 延迟时间5s

可以看到消费者并没有等待第一条延迟时间较长的消息消费完之后再去消费第二条延迟时间较短的消息,解决了之前死信队列方式的小bug

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5654148.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存