RabbitMQ 详解

RabbitMQ 详解,第1张

RabbitMQ 详解

RabbitMQ 详解

MQ 的相关概念RabbitMQ 四大核心概念RabbitMQ 的工作原理RabbitMQ 六大核心部分(模式)简单模式工作模式

工作模式案例消息确认(消息应答)消息持久化 发布确认模式交换机(Exchange)

Exchange 概念临时队列绑定(bindings)扇出交换机(fanout exchange)直连交换机(direct exchange)主题交换机(topic exchange) 死信队列

概念及应用场景死信来源死信实例

消息 TTL 过期队列达到最大长度消息被拒绝,并且 requeue = false 延时队列

概念应用场景RabbitMQ 中的 TTL 发布确认高级RabbitMQ的其他知识点

幂等性优先队列惰性队列

准备工作:
1、Linux 环境
2、Linux 环境下安装 RabbitMQ :https://blog.csdn.net/qq_36763419/article/details/122152767

MQ 的相关概念

1、什么是 MQ ?
MQ(Message Queue),从字面意思上看,本质是个队列,FIFO先进先出,只不过队列中存放的内容是 message 而已,还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ 是一种非常常见的上下游 逻辑解耦 + 物理解耦 的消息通信服务。使用了 MQ 之后,消息发送上游只需依赖 MQ,不用依赖其他的服务。

2、为什么要使用 MQ ?MQ的三大作用。
(1)流量削峰:MQ 中可以对请求进行排队处理,避免系统宕机。

(2)应用解耦
以电商应用为例,应用中有订单系统、库存系统、物流系统、支付系统。用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障,都会造成下单 *** 作异常。当转变成基于消息队列的方式后,系统间调用的问题会减少很多,比如物流系统因为发生故障,需要几分钟来修复。在这几分钟的时间里,物流系统要处理的内存被缓存在消息队列中,用户的下单 *** 作可以正常完成。当物流系统恢复后,继续处理订单信息即可,中单用户感受不到物流系统的故障,提升系统的可用性。

(3)异步处理

3、主流消息中间件(MQ)分类及应用场景:https://blog.csdn.net/qq_36763419/article/details/122024344

RabbitMQ 四大核心概念

1、生产者:产生数据发送消息的程序是生产者。
2、交换机:交换机是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面他讲消息推送到队列中。交换机必须确切的知道如何处理它接受的消息,是将这些消息推送到特定队列还是送到多个队列,亦或者是把消息丢弃,这个得由交换机类型决定。交换分为如下及中:直连交换机(direct exchange)、主题交换机(topic exchange)、标题交换机(headers exchange)、扇出交换机(fanout exchange)
3、队列:队列是 RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但它们只能存储在队列中。队列仅受主机的内存和磁盘限制的约束。本质上是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列中,许多消费者可以从队列中接收数据,这就是我们使用队列的方式。
4、消费者:消费者和接受具有相似的含义。消费者大多时候是一个等待接受消息的程序,请注意生产者、消费者、消息中间件大多时候并不在统一机器上,统一个应用既可以是生产者也可以是消费者。

RabbitMQ 的工作原理


Broker:接收和分发消息的应用,RabbitMQ Server 就是 Message Broker。
Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个 vhost,每个用户在自己的 vhost 创建 exchange/queue 等。
Connection:publisher/consumer 和 broker 之间的 TCP 连接。
Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了 *** 作系统建立 TCP connection 的开销。
Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。常用的类型有:direct (point-to-point)、topic (publish-subscribe) and fanout (multicast)。
Queue:消息存储的数据结构,消息最终被送到这里等待 consumer 取走。
Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key,Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据。

RabbitMQ 六大核心部分(模式)

1、简单模式
2、工作模式
3、发布/订阅模式
4、路由模式
5、主题模式
6、消息确认模式

简单模式

在下图中,P是我们的生产者, C 是我们的消费者。中间的框是一个队列 RabbitMQ 代表使用者保留的消息缓冲区。

maven依赖:

        
        
            com.rabbitmq
            amqp-client
            5.14.0
        

一、消息生产生产者

package com.mq.rabbitmq.simplemode;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


public class Producer {

    //队列名称(路由键)
    private static final String SIMPLE_MODE_QUEUE_NAME = "routeKey";
    private static final String PRODUCER = "生产者发送";

    //发送消息
    public static void main(String[] args) {
        //创建一个连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.0.92");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("123456");


        //使用try...with...resources可自动关闭资源
        
        try (
                //从连接工厂获取连接实例
                Connection connection = connectionFactory.newConnection();
                //创建一个通道
                Channel channel = connection.createChannel()
        ) {
            
            channel.queueDeclare(SIMPLE_MODE_QUEUE_NAME, false, false, false, null);
            //定义要消费的消息
            String message = "Hello World!";
            
            channel.basicPublish("", SIMPLE_MODE_QUEUE_NAME, null, message.getBytes());
            System.out.println(PRODUCER + message);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

二、消息消费者

package com.mq.rabbitmq.simplemode;

import com.rabbitmq.client.*;

public class Consumer {

    //队列名称(路由键)
    private static final String SIMPLE_MODE_QUEUE_NAME = "routeKey";
    private static final String ConSUMER = "消费者收到";

    public static void main(String[] args) {

        //创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.0.92");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("123456");

        //使用try...with...resources可自动关闭资源
        try (
                //从连接工厂中获取连接实例
                Connection connection = connectionFactory.newConnection();
                //创建一个通道
                Channel channel = connection.createChannel()
        ) {
            System.out.println("等待接收消息......");
            //推送的消息如何进行消费的接口回调
            DeliverCallback deliverCallback = (consumerTag, deliver) -> {
                byte[] byteMsg = deliver.getBody();
                String message = new String(byteMsg);
                System.out.println(ConSUMER + message);
            };
            //取消消费的一个回调接口,如在消费的时候队列被删除掉了
            CancelCallback cancelCallback = consumerTag -> System.out.println("消息消费被中断");

            
            channel.basicConsume(SIMPLE_MODE_QUEUE_NAME, true, deliverCallback, cancelCallback);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
工作模式 工作模式案例

工作队列(又称任务队列)的主要思想就是避免立即执行资源密集型任务,而不得不等待它完成,相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的进程将d出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。类比负载均衡。

一、RabbitMQ连接工具类

package com.mq.rabbitmq.workmode;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class RabbitMQUtils {

    public static Channel getChannel() throws Exception {
        //创建一个连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.31.91");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("123456");
        Connection connection = connectionFactory.newConnection();
        return connection.createChannel();
    }
}

二、消息生产者

package com.mq.rabbitmq.workmode;

import com.rabbitmq.client.Channel;

import java.util.Scanner;


public class WorkModeProducer {

    //路由键:routeKey
    private static final String WORK_MODE_QUEUE_NAME = "routeKey";
    private static final String PRODUCER = "生产者发送 ";

    public static void main(String[] args) {

        //获取连接通道
        try (Channel channel = RabbitMQUtils.getChannel()) {

            //创建队列
            channel.queueDeclare(WORK_MODE_QUEUE_NAME, false, false, false, null);
            //发送从控制台输入的消息
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNext()) {
                String message = scanner.next();
                //发布消息
                channel.basicPublish("", WORK_MODE_QUEUE_NAME, null, message.getBytes());
                System.out.println(PRODUCER + message);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

三、消息消费者(两份)

package com.mq.rabbitmq.workmode;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;


public class WorkModeConsumerOne {

    //定义路由键
    private static final String WORK_MODE_QUEUE_NAME = "routeKey";
    //消费者one
    private static final String ConSUMER = "消费者 one 收到 ";

    public static void main(String[] args) {
        //获取连接通道
        try {
            Channel channel = RabbitMQUtils.getChannel();
            //消息回调,确认回复等信息
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                long deliveryTag = delivery.getEnvelope().getDeliveryTag();
                try {
                    byte[] body = delivery.getBody();
                    String receiveMessage = new String(body);
                    System.out.println(ConSUMER + receiveMessage);
                    
                    channel.basicAck(deliveryTag, true);
                } catch (Exception e) {
                    e.printStackTrace();
                    //消费者消费消息出现异常时,消息重回队列
                    
                    channel.basicNack(deliveryTag, true, true);
                    
                    //channel.basicReject(deliveryTag, true);
                }
            };
            //取消消费的一个回调接口,如在消费的时候队列被删除掉了
            CancelCallback cancelCallback = consumerTag -> {
                System.out.println(consumerTag + "消息消费被中断");
            };
            System.out.println("等待消费者 one 消费");
            //开启手动确认后,autoAck要设置成false
            channel.basicConsume(WORK_MODE_QUEUE_NAME, false, deliverCallback, cancelCallback);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

由以上可知,rabbtimq在所有消费者中轮询分发消息,把消息均匀发送给所有消费者。以上代码实现了消息确认、消费异常消息重回队列

消息确认(消息应答)

一、自动确认(应答)
消息在发送后被认为已经发送成功(可能导致消息丢失),这种模式需要在高吞吐量和数据传输安全性方面做权衡,另一方面,该模式没有对传递的消息数量进行限制,消费者来不及处理是,可能会导致消息积压,所以自动应答的模式仅适用于消费者可以高效并以某种速率能够处理这些消息的情况下使用。

二、手动确认(应答):手动应答的好处是可以批量应答并且减少网络拥堵。

具体代码及解释,查看【工作模式 - 消费者】

消息持久化

消息持久化是为了保证 RabbitMQ服务 down 掉之后重启,能够保证生产者发送的消息不丢失。为了确保消息不丢失,需要做两件事:将 队列、消息 都标记为持久化。

一、队列持久化:RabbitMQ 重启后,队列任然存在

但是需要注意的就是如果之前声明的队列不是持久化的,需要把原先队列先删除,或者重新创建一个持久化的队列,不然就会出现错误。

二、消息持久化:将消息标记为持久化并不能完全保证不会丢失消息。尽管它告诉 RabbitMQ 将消息保存到磁盘,但是
这里依然存在当消息刚准备存储在磁盘的时候 但是还没有存储完,消息还在缓存的一个间隔点。此时并没
有真正写入磁盘。持久性保证并不强。

三、不公平分发
RabbitMQ 分发消息的默认采用的使用轮询方式。为了在不同场景下设置不同的分发策略。可以通过如下参数设置:

发布确认模式

发布确认分为:
1、单个确认发布
2、批量确认发布
3、异步确认发布

//开启确认发布模式
channel./confirm/iSelect()

异步确认发布

一、生产者

package com.mq.rabbitmq.publish/confirm/i;

import com.mq.rabbitmq.workmode.RabbitMQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client./confirm/iCallback;
import com.rabbitmq.client./confirm/iListener;

import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;


public class Publish/confirm/iModeProducer {

    //路由键:routeKey
    private static final String WORK_MODE_QUEUE_NAME = "routeKey";
    private static final String PRODUCER = "生产者发送 ";

    public static void main(String[] args) {

        //获取连接通道
        try (Channel channel = RabbitMQUtils.getChannel()) {

            //创建队列
            //持久化队列:durable 设置为 true。
            //channel.queueDelete(WORK_MODE_QUEUE_NAME);
            //final boolean durable = true;

            
            //channel.basicQos(4);
            channel.queueDeclare(WORK_MODE_QUEUE_NAME, false, false, false, null);

            //开启发布确认模式
            channel./confirm/iSelect();
            
            ConcurrentSkipListMap unConfirmsListMap = new ConcurrentSkipListMap<>();

//            
//            ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
//                //删除确认的消息
//                if (multiple) {
//                    System.out.println("确认收到消息");
//
//                    //返回的是小于等于当前序列号的确认的消息,是一个Map,清除之后剩下的就是未被确认的消息
//                    ConcurrentNavigableMap confirmed = un/confirm/isListMap.headMap(deliveryTag, true);
//                    //清除该部分未确认的消息
//                    /confirm/ied.clear();
//                } else {
//                    //只清除当前序列的消息
//                    un/confirm/isListMap.remove(deliveryTag);
//                }
//            };
//            
//            ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
//                //打印未确认的消息
//                String message = un/confirm/isListMap.get(deliveryTag);
//                System.out.println("发布的消息" + message + "未确认,序列号:" + deliveryTag);
//            };
//               
//            channel.addConfirmListener(ackCallback,nackCallback);

            /confirm/iListener confirmListener = new /confirm/iListener() {
                @Override
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println(multiple);
                    //删除确认的消息
                    if (multiple) {
                        System.out.println("确认收到消息");

                        //返回的是小于等于当前序列号的确认的消息,是一个Map,清除之后剩下的就是未被确认的消息
                        ConcurrentNavigableMap confirmed = un/confirm/isListMap.headMap(deliveryTag, true);
                        //清除该部分未确认的消息
                        /confirm/ied.clear();
                    } else {
                        //只清除当前序列的消息
                        un/confirm/isListMap.remove(deliveryTag);
                    }
                }

                @Override
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    //打印未确认的消息
                    String message = un/confirm/isListMap.get(deliveryTag);
                    System.out.println("发布的消息" + message + "未确认,序列号:" + deliveryTag);
                }
            };
            channel.add/confirm/iListener(/confirm/iListener);

            //发送从控制台输入的消息
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNext()) {
                String message = scanner.next();
                
                long nextPublishSeqNo = channel.getNextPublishSeqNo();
                un/confirm/isListMap.put(nextPublishSeqNo, message);
                //发布消息
                //当 durable 为 true 的时候
                channel.basicPublish("", WORK_MODE_QUEUE_NAME, null, message.getBytes());
                System.out.println(PRODUCER + message);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

二、消费者

package com.mq.rabbitmq.publish/confirm/i;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;


public class Publish/confirm/iModeConsumerOne {

    //定义路由键
    private static final String WORK_MODE_QUEUE_NAME = "routeKey";
    //消费者one
    private static final String ConSUMER = "消费者 one 收到 ";

    public static void main(String[] args) {


        AtomicReference receiveMessage = new AtomicReference<>("");
        //获取连接通道
        try {
            Channel channel = RabbitMQUtils.getChannel();
            //消息回调,确认回复等信息
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                long deliveryTag = delivery.getEnvelope().getDeliveryTag();
                try {
                    byte[] body = delivery.getBody();
                    receiveMessage.set(new String(body));
                    System.out.println(ConSUMER + receiveMessage);
                    int num= Integer.valueOf(receiveMessage.get());
                    int a = 10 / num;
                    
                    channel.basicAck(deliveryTag, false);
                } catch (Exception e) {
                    e.printStackTrace();
                    //消费者消费消息出现异常时,消息重回队列(用于否定确认)
                    
//                    channel.basicNack(deliveryTag, true, true);
                    if (Objects.equals(receiveMessage.get(),"0")) {
                        channel.basicNack(deliveryTag, true, false);
                    }else {
                        channel.basicNack(deliveryTag, true, true);
                    }
                    
                    //channel.basicReject(deliveryTag, true);
                }
            };
            //取消消费的一个回调接口,如在消费的时候队列被删除掉了
            CancelCallback cancelCallback = consumerTag -> {
                System.out.println(consumerTag + "消息消费被中断");
            };
            System.out.println("等待消费者 one 消费");
            //开启手动确认后,autoAck要设置成false
            channel.basicConsume(WORK_MODE_QUEUE_NAME, false, deliverCallback, cancelCallback);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
交换机(Exchange)

通过交换机将同一个消息转发给多个消费者消费,这种模式成为 发布/订阅 模式。

Exchange 概念

一、为什么要有交换机呢?

之前的简单模式和工作模式中,我们不难看出,消费者绑定的都是同一个队列,多个消费者之间存在竞争关系,也就是说同一个队列的消息消费者消费一次,不能重复消费。

现在这样一个需求,一个消息需要被消费多次。例如(日志系统):我们会启动两个消费者,其中一个消费者接收到消息后把日志存储在磁盘,另外一个消费者接收到消息后把消息打印在屏幕上,事实上第一个程序发出的日志消息将广播给所有消费者。那么该如何设计处理呢?可以使用交换机来解决此问题。设计理念图如下:

二、Exchange 的概念及分类

1、RabbitMQ 消息传递模型的核心思想是:生产者的消息从不会直接发送到队列。实际上,通常生产者升值都不知道这些消息传递到哪些队列中。相反,生产者只能将消息发送到交换机(Exchange),由交换机将消息转发并推送到队列当中。交换机必须确切的知道如何处理收到的消息,将消息放入特定的队列或者丢弃消息,这就由交换机的类型来决定。

2、交换机的类型如下:
(1)直连交换机(direct exchange)
(2)主题交换机(topic exchange)
(3)标题交换机(headers exchange)
(4)扇出交换机(fanout excahnge)
其实还有一个默认的交换机,通常情况下不命名都会使用默认的交换机。

临时队列

每当我们连接 RabbitMQ 时,我们都需要一个全新的空队列,为此我们可以创建一个具有随机名称的队列,或者能让服务器为我们选择一个随机队列名称那就更好了。其次一旦断开了消费者的连接,队列兼备自动删除。 创建临时队列的方式如下:

String queueName = channel.queueDeclare().getQueue();

临时队列创建成功后:

绑定(bindings)

binding 的概念其实就是 exchange 与 queue 之间的桥梁,交换机(exchange)与队列(queue)之间是绑定(binding)关系。

扇出交换机(fanout exchange)

fanout exchange 非常简单,它是将收到的所有消息 广播 给所有绑定它的 queue 中。

实际案例: 生产者发送日志,将日志消息发送到 fanout exchange 中,再由 fanout exchange 将消息转发到绑定在改交换机的 临时queue,消费者 C1 控制台输出日志,消费者 C2 则将日志存储到磁盘中。

一、生产者

package com.mq.rabbitmq.fanoutexchange;

import com.rabbitmq.client.Channel;

import java.util.Scanner;


public class FanoutExchangeProducer {

    private static final String EXCHANGE_NAME = "fanout_exchange";

    public static void main(String[] args) {

        try {
            //获取信道
            Channel channel = RabbitMQUtils.getChannel();
            //定义交换机:扇出交换机
           channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);

            Scanner scanner = new Scanner(System.in);
            System.out.println("请输入日志信息");
            while (scanner.hasNext()) {
                String logMsg = scanner.nextLine();
                channel.basicPublish(EXCHANGE_NAME, "", null, logMsg.getBytes("UTF-8"));
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

二、消费者
1、控制台输出日志

package com.mq.rabbitmq.fanoutexchange;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

public class ConsumerOne {

    private static final String EXCHANGE_NAME = "fanout_exchange";

    //定义路由键 routeKey(binding)
    private static final String ROUTE_KEY = "binding";

    public static void main(String[] args) {
        try {
            //获取信道
            Channel channel = RabbitMQUtils.getChannel();
            //定义交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
            //通过系统生成随机的临时队列
            String queueName = channel.queueDeclare().getQueue();
            //将临时队列与交换机绑定
            channel.queueBind(queueName, EXCHANGE_NAME, ROUTE_KEY);

            //输出日志到控制台
            System.out.println("等待接收消息, 把接收到的消息打印在屏幕上......");

            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                //获取序列
                long deliveryTag = delivery.getEnvelope().getDeliveryTag();
                try {
                    byte[] bytes = delivery.getBody();
                    String message = new String(bytes, "UTF-8");
                    System.out.println("控制台打印接收到的消息:" + message);
                    channel.basicAck(deliveryTag, true);
                } catch (Exception e) {
                    e.printStackTrace();
                    channel.basicNack(deliveryTag, true, true);
                }
            };
            CancelCallback cancelCallback = consumerTag -> {
                System.out.println("消息消费被中断");
            };
            channel.basicConsume(queueName, false, deliverCallback, cancelCallback);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

2、将日志保存到文件

package com.mq.rabbitmq.fanoutexchange;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.apache.commons.io.FileUtils;

import java.io.File;


public class ConsumerTwo {

    private static final String EXCHANGE_NAME = "fanout_exchange";

    //定义路由键 routeKey(binding)
    private static final String ROUTE_KEY = "binding";

    public static void main(String[] args) {
        try {
            //获取信道
            Channel channel = RabbitMQUtils.getChannel();
            //定义交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
            //通过系统生成随机的临时队列
            String queueName = channel.queueDeclare().getQueue();
            //将临时队列与交换机绑定
            channel.queueBind(queueName, EXCHANGE_NAME, ROUTE_KEY);

            //输出日志到控制台
            System.out.println("等待接收消息, 把接收到的消息保存到磁盘......");

            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                //获取序列
                long deliveryTag = delivery.getEnvelope().getDeliveryTag();
                try {
                    String message = new String(delivery.getBody(), "UTF-8");
                    File file = new File("D:\rabbitmq_log.txt");
                    //此方式为横向拼接
//                    FileOutputStream fileOutputStream = new FileOutputStream(file, true);
//                    fileOutputStream.write(message.getBytes());
                    FileUtils.writeStringToFile(file, message, "UTF-8");
                    System.out.println("日志保存磁盘成功");
                    channel.basicAck(deliveryTag, true);
                } catch (Exception e) {
                    e.printStackTrace();
                    channel.basicNack(deliveryTag, true, true);
                }
            };
            CancelCallback cancelCallback = consumerTag -> {
                System.out.println("消息消费被中断");
            };
            channel.basicConsume(queueName, false, deliverCallback, cancelCallback);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
直连交换机(direct exchange)

一、direct excahnge 概念

我们知道交换机 exchange 与 queue 之间的关系叫做 binding ,绑定用参数 routeKey(路由键)表示。首先 routeKey 是交换机与队列之前的桥梁,那么简单的可以将其作为交换机只能通过特定的 routeKey 才能向特定的队列中转发消息。例如,日志按照日志级别来划分,error 级别的日志保存到磁盘,warning、info 级别日志在控制台输出。这个时候,routeKey 起到了一个消息分类的作用。

二、多重绑定

当然如果 exchange 的绑定类型是 direct,但是它绑定的多个队列的 key 如果都相同,在这种情况下虽然绑定类型是 direct 但是它表现的就和 fanout 有点类似了,就跟广播差不多,如下图所示。

三、案例说明

1、生产者

package com.mq.rabbitmq.directexchange;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;


public class DirectProducer {

    private static final String EXCHANGE_NAME = "direct_exchange";

    public static void main(String[] args) {
        //初始化 routeKey(bindingKey)-message 的Map
        Map routeKeyMessageMap = new HashMap<>();
        routeKeyMessageMap.put("routeKey_info", "普通 info 日志信息");
        routeKeyMessageMap.put("routeKey_warning", "警告 warning 日志信息");
        routeKeyMessageMap.put("routeKey_error", "错误 error 日志信息");
        //debug没有消费者接受这个消息,所以就丢失了
        routeKeyMessageMap.put("routeKey_debug", "调试 debug 日志信息");

        try {
            //获取连接通道
            Channel channel = RabbitMQUtils.getChannel();
            //定义交换机名称及其类型
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            //发送消息
            Set> entries = routeKeyMessageMap.entrySet();
            for (Map.Entry entry : entries) {
                String routeKey = entry.getKey();
                Object message = entry.getValue();
                //发送消息
                channel.basicPublish(EXCHANGE_NAME, routeKey, null, (message == null ? "" : message.toString()).getBytes("UTF-8"));
                System.out.println("生产者发送消息:" + message);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

2、消费者
(1)写入 error 日志信息

package com.mq.rabbitmq.directexchange;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.apache.commons.io.FileUtils;

import java.io.File;


//将错误值保存到文件
public class DirectErrorConsumer {

    //定义交换机名称
    private static final String EXCHANGE_NAME = "direct_exchange";

    public static void main(String[] args) {

        try {
            //获取连接通道
            Channel channel = RabbitMQUtils.getChannel();
            //定义队列名称:file_queue 的队列
            String queueName = "file_queue";
            channel.queueDeclare(queueName, false, false, false, null);
            //定义交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            //将交换机与 queueName 绑定,绑定键(路由键) routeKey_error
            channel.queueBind(queueName, EXCHANGE_NAME, "routeKey_error");

            //消费回调
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                long deliveryTag = delivery.getEnvelope().getDeliveryTag();
                try {
                    String message = new String(delivery.getBody(), "UTF-8");
                    File file = new File("D:\rabbitmq_log.txt");
                    FileUtils.writeStringToFile(file, message, "UTF-8");
                    System.out.println("消费者一收到消息:" + message + ", 并将该消息写入磁盘");
                    //手动确认
                    channel.basicAck(deliveryTag, false);
                } catch (Exception e) {
                    //消费出错消息重会队列
                    channel.basicNack(deliveryTag, false, true);
                }
            };
            //中断回调
            CancelCallback cancelCallback = consumerTag -> {
                System.out.println("消息消费被中断");
            };

            channel.basicConsume(queueName, false, deliverCallback, cancelCallback);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

(2)控制台输出 info 、warning 日志信息

package com.mq.rabbitmq.directexchange;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;


//将info、warning等日志级别输出
public class DirectConsoleConsumer {

    //定义交换机名称
    private static final String EXChANGE_NAME = "direct_exchange";

    public static void main(String[] args) {
        try {
            //获取连接通道
            Channel channel = RabbitMQUtils.getChannel();
            //定义队列名称:console_queue 的队列
            String queueName = "console_queue";
            channel.queueDeclare(queueName, false, false, false, null);

            //定义交换机及其类型
            channel.exchangeDeclare(EXChANGE_NAME, BuiltinExchangeType.DIRECT);
            //将 queue_name 列绑定交换机,其中路由键为:routeKey_info、routeKey_warning
            channel.queueBind(queueName, EXChANGE_NAME, "routeKey_info");
            channel.queueBind(queueName, EXChANGE_NAME, "routeKey_warning");

            //消息消费回调
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                long deliveryTag = delivery.getEnvelope().getDeliveryTag();
                try {
                    String message = new String(delivery.getBody(), "UTF-8");
                    System.out.println("消费者二收到消息:" + message);
                    //手动确认
                    channel.basicAck(deliveryTag, false);
                } catch (Exception e) {
                    e.printStackTrace();
                    //消费失败消息重回队列
                    channel.basicNack(deliveryTag, false, true);
                }
            };
            //消费者消费中断
            CancelCallback cancelCallback = consumerTag -> {
                System.out.println("消息消费被中断");
            };
            channel.basicConsume(queueName,false,deliverCallback,cancelCallback);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
主题交换机(topic exchange)

在日志系统中,我们不仅需要根据日志级别订阅日志,还希望从发出日志的源订阅日志。这时候,direct exchange 就办不到了,这个时候只能采用 topic exchange。

topic exchange 的要求: 发送类型是 topic exchange 的消息的 路由键 route_key 不能随便写,必须是一个单词列表,以点号隔开。这些单词可以是任意单词,比如说:stock.usd.nyse、nyse.vmw、quick.orange.rabbit 这种类型的。单词列表的长度不能超过 255 个字节。在这个规则列表中,其中有两个替换符:
1、* 可以代替一个单词
2、# 可以代替零个或多个
交换机类型是 topic 的时候,值得注意的是,①当一个队列的绑定键是 # 时,那么队列将接收所有的数据,有点像 fanout 。②当队列绑定键没有 # 和 * 时,那么该对列就像 direct 。

一、生产者

package com.mq.rabbitmq.topicexchange;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;


//主题交换机-生产者
public class TopicProducer {

    //定义交换机名称
    private static final String EXCHANGE_NAME = "topic_exchange";

    public static void main(String[] args) {
        try {
            //获取连接通道
            Channel channel = RabbitMQUtils.getChannel();
            //定义创建交换机及类型
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, false, true, null);
            //定义路由键及其消息
            Map routeKeyMessage = new HashMap<>();
            routeKeyMessage.put("quick.orange.rabbit", "被队列 Q1Q2 接收到");
            routeKeyMessage.put("lazy.orange.elephant", "被队列 Q1Q2 接收到");
            routeKeyMessage.put("quick.orange.fox", "被队列 Q1 接收到");
            routeKeyMessage.put("lazy.brown.fox", "被队列 Q2 接收到");
            routeKeyMessage.put("lazy.pink.rabbit", "虽然满足两个绑定但只被队列 Q2 接收一次");
            routeKeyMessage.put("quick.brown.fox", "不匹配任何绑定不会被任何队列接收到会被丢弃");
            routeKeyMessage.put("quick.orange.male.rabbit", "是四个单词不匹配任何绑定会被丢弃");
            routeKeyMessage.put("lazy.orange.male.rabbit", "是四个单词但匹配 Q2");

            //发送消息
            Set> entries = routeKeyMessage.entrySet();
            for (Map.Entry entry : entries) {
                //路由键(绑定键)
                String routeKey = entry.getKey();
                Object message = entry.getValue();
                channel.basicPublish(EXCHANGE_NAME, routeKey, null, (message == null ? "" : message.toString()).getBytes("UTF-8"));
                System.out.println("生产者发出消息" + message);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

二、消费者
1、接收路由键为 *.orange.* 的消息

package com.mq.rabbitmq.topicexchange;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;


//消费者一
public class TopicConsumerOne {

    //定义交换机名称
    private static final String EXCHANGE_NAME = "topic_exchange";

    public static void main(String[] args) {

        try {
            //获取连接通道
            Channel channel = RabbitMQUtils.getChannel();
            //定义交换机及其类型
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC,false,true,null);
            //定义队列 Q1
            String queueName = "Q1";
            channel.queueDeclare(queueName, false, false, false, null);
            //绑定队列 routeKey = *.orange.*
            channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");

            System.out.println("消费者一等待接收消息......");

            //消费回调
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                long deliveryTag = delivery.getEnvelope().getDeliveryTag();
                try {
                    String message = new String(delivery.getBody(), "UTF-8");
                    System.out.println("接收队列:" + queueName + "绑定键(路由键):" + deliveryTag + ", 消息:" + message);
                    //手动确认
                    channel.basicAck(deliveryTag, false);
                } catch (Exception e) {
                    e.printStackTrace();
                    channel.basicNack(deliveryTag, false, true);
                }
            };
            //消费中断回调
            CancelCallback cancelCallback = consumerTag -> {
                System.out.println("消费消息被中断");
            };

            channel.basicConsume(queueName, false, deliverCallback, cancelCallback);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

2、接受路由键为 *.*.rabbit 与 lazy.# 的消息

package com.mq.rabbitmq.topicexchange;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;


//消费者二
public class TopicConsumerTwo {

    //定义交换机名称
    private static final String EXCHANGE_NAME = "topic_exchange";

    public static void main(String[] args) {

        try {
            //获取连接通道
            Channel channel = RabbitMQUtils.getChannel();
            //定义交换机及其类型
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC,false,true,null);
            //定义队列 Q1
            String queueName = "Q2";
            channel.queueDeclare(queueName, false, false, false, null);
            //绑定队列 routeKey = *.orange.* / lazy.#
            channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit");
            channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#");

            System.out.println("消费者二等待接收消息......");

            //消费回调
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                long deliveryTag = delivery.getEnvelope().getDeliveryTag();
                try {
                    String message = new String(delivery.getBody(), "UTF-8");
                    System.out.println("接收队列:" + queueName + "绑定键(路由键):" + deliveryTag + ", 消息:" + message);
                    //手动确认
                    channel.basicAck(deliveryTag, false);
                } catch (Exception e) {
                    e.printStackTrace();
                    channel.basicNack(deliveryTag, false, true);
                }
            };
            //消费中断回调
            CancelCallback cancelCallback = consumerTag -> {
                System.out.println("消费消息被中断");
            };

            channel.basicConsume(queueName, false, deliverCallback, cancelCallback);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
死信队列 概念及应用场景

死信,顾名思义就是无法被消费的消息。一般来说说 producer 将消息发送到 broker 或者直接到 queue 里,consumer 从 queue 取出消息进行消费,但是由于某种特定原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。应用场景如下:

(1)为了保证订单业务的消息数据不丢失,需要用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。
(2)用户在商城下单成功并点击去支付后,在指定的时间段内未支付时,自动失效。

死信来源

1、消息 TTL(Time to live【生存时间值】)过期,即消息的生存时间过期。
2、队列达到最大长度(队列满了,无法在添加消息到队列 mq 中)。
3、消息被拒绝(channel.basicReject() 或channel.basicNack() )并且 requeue = false

死信实例

一、架构图

消息 TTL 过期

一、生产者

package com.mq.rabbitmq.deadmq.msgttl;

import com.mq.rabbitmq.deadmq.RabbitMQUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;

//消息生产者
public class MsgTTLProducer {

    //定义交换机名称
    private static final String EXCHANGE_NAME = "normal_exchange";

    //定义路由键
    private static final String ROUTE_KEY = "normal_route_key";


    public static void main(String[] args) {
        try {
            //获取连接通道
            Channel channel = RabbitMQUtils.getChannel();
            //设置交换机名称及其类型
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            //设置消息的TTL时间,等待 10 秒后发送到 normal_route_key 路由键的消息过期
            AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
            //该信息是用作演示队列个数限制
            for (int i = 0; i < 10; i++) {
                String message = "info" + i;
                //发送消息到交换机
                channel.basicPublish(EXCHANGE_NAME, ROUTE_KEY, properties, message.getBytes("UTF-8"));
                System.out.println("生产者发送消息:" + message);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

二、消费者
1、正常消费

package com.mq.rabbitmq.deadmq.msgttl;

import com.mq.rabbitmq.deadmq.RabbitMQUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.util.HashMap;
import java.util.Map;

//正常消息消费者
public class MsgTTLNormalConsumer {

    //定义正常交换机的名称
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    //定义死信队列交换机名称
    private static final String DEAD_EXCHANGE = "dead_exchange";

    //定义正常队列名称
    private static final String NORMAL_QUEUE = "normal_queue";
    //定义失败队列名称
    private static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) {

        try {
            //获取连接通道
            Channel channel = RabbitMQUtils.getChannel();
            //定义正常(死信)交换机及其类型
            channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
            channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
            //定义(死信)队列
            channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
            //死信队列与死信交换机绑定
            channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "dead_route_key");

            //正常队列绑定死信队列消息
            Map params = new HashMap<>();
            //正常队列设置死信交换机 参数 key 为固定值
            params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
            //正常队列设置死信 routing-key 参数 key 为固定值
            params.put("x-dead-letter-routing-key", "dead_route_key");

            //定义正常队列
            channel.queueDeclare(NORMAL_QUEUE, false, false, false, params);
            //正常队列绑定正常交换机
            channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "normal_route_key");

            System.out.println("等待接收消息......");
            //确认回调
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                long deliveryTag = delivery.getEnvelope().getDeliveryTag();
                try {
                    String message = new String(delivery.getBody(), "UTF-8");
                    System.out.println("消费者 1 接收到消息:" + message);
                    channel.basicAck(deliveryTag, false);
                } catch (Exception e) {
                    e.printStackTrace();
                    channel.basicNack(deliveryTag, false, true);
                }
            };
            //中断回调
            CancelCallback cancelCallback = consumerTag -> {
                System.out.println("消息消费被中断");
            };
            channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, cancelCallback);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

2、死信消费

package com.mq.rabbitmq.deadmq.msgttl;

import com.mq.rabbitmq.deadmq.RabbitMQUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

//正常消息消费者
public class MsgTTLDeadConsumer {

    //定义死信队列交换机名称
    private static final String DEAD_EXCHANGE = "dead_exchange";

    //定义失败队列名称
    private static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) {

        try {
            //获取连接通道
            Channel channel = RabbitMQUtils.getChannel();
            //(死信)交换机及其类型
            channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
            //定义(死信)队列
            channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
            //死信队列与死信交换机绑定
            channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "dead_route_key");


            System.out.println("等待接收死信队列的消息......");
            //确认回调
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                long deliveryTag = delivery.getEnvelope().getDeliveryTag();
                try {
                    String message = new String(delivery.getBody(), "UTF-8");
                    System.out.println("消费者 2 接收到消息:" + message);
                    channel.basicAck(deliveryTag, false);
                } catch (Exception e) {
                    e.printStackTrace();
                    channel.basicNack(deliveryTag, false, true);
                }
            };
            //中断回调
            CancelCallback cancelCallback = consumerTag -> {
                System.out.println("消息消费被中断");
            };
            channel.basicConsume(DEAD_QUEUE, false, deliverCallback, cancelCallback);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

以下案例只展示关键概念和关键代码。

队列达到最大长度

参考 消息 TTL 过期 ,在 normal 消费者中加入如下代码可控制,正常消费队列的最大长度:

消息被拒绝,并且 requeue = false

消息确认的时候,加入如下代码,消息加入死信队列:

延时队列 概念

延时队列的内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望指定时间到了以后或之前去除和处理。简单说,延时队列就是用来存放指定时间被处理的元素的队列。

应用场景

1、订单在十分钟之内未支付自动取消。
2、新创建的店铺如果在十天之内都没有上传过商品,则自动发送消息提醒。
3、用户注册成功后没如果三天内没有登录则短信提醒。
4、用户发起退款,如果三天内没有得到处理则通知相关运营人员。
5、预定会议后们需要在预定时间点前十分钟通知各个会议人员参加会议。

RabbitMQ 中的 TTL

TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有
消息的最大存活时间,单位是毫秒。换句话说,如果一条消息设置了 TTL 属性或者进入了设置 TTL 属性的队列,那么这条消息如果在 TTL 设置的时间内没有被消费,则会成为"死信"。如果同时配置了队列的 TTL 和消息的TTL,那么较小的那个值将会被使用,有两种方式设置 TTL。

消息设置 TTL: 指的是针对每条消息设置消息的最大存活时间。
队列设置 TTL: 指的是在创建队列的时候设置队列的 x-message-ttl 属性。
二者的区别: 如果设置了队列的 TTL 属性,那么一旦消息过期,就会被队列丢弃(如果配置了死信队列被丢到死信队列中),而第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间;另外,还需要注意的一点是,如果不设置 TTL,表示消息永远不会过期,如果将 TTL 设置为 0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。

延时队列的详细介绍:https://blog.csdn.net/Zero_dot_degree/article/details/107521638

发布确认高级


代码地址:https://github.com/1914526816lhw/cloud-microservice/tree/master/cloud-stream-rabbitmq-provider8801/src/main/java/com/atguigu/springcloud/messagequeue

RabbitMQ的其他知识点 幂等性

1、 概念
用户对于同一 *** 作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。举个最简单的例子,那就是支付,用户购买商品后支付,支付扣款成功,但是返回结果的时候网络异常,此时钱已经扣了,用户再次点击按钮,此时会进行第二次扣款,返回结果成功,用户查询余额发现多扣钱了,流水记录也变成了两条。在以前的单应用系统中,我们只需要把数据 *** 作放入事务中即可,发生错误
立即回滚,但是再响应客户端的时候也有可能出现网络中断或者异常等等

2、消息重复消费
消费者在消费 MQ 中的消息时,MQ 已把消息发送给消费者,消费者在给 MQ 返回 ack 时网络中断,故 MQ 未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息。

3、 解决思路
MQ 消费者的幂等性的解决一般使用全局 ID 或者写个唯一标识比如时间戳 或者 UUID 或者订单消费者消费 MQ 中的消息也可利用 MQ 的该 id 来判断,或者可按自己的规则生成一个全局唯一 id,每次消费消息时用该 id 先判断该消息是否已消费过。
4、 消费端的幂等性保障
在海量订单生成的业务高峰期,生产端有可能就会重复发生了消息,这时候消费端就要实现幂等性,这就意味着我们的消息永远不会被消费多次,即使我们收到了一样的消息。业界主流的幂等性有两种 *** 作:a.唯一 ID+指纹码机制,利用数据库主键去重, b.利用 redis 的原子性去实现

5、 唯一 ID+指纹码机制
指纹码:我们的一些规则或者时间戳加别的服务给到的唯一信息码,它并不一定是我们系统生成的,基本都是由我们的业务规则拼接而来,但是一定要保证唯一性,然后就利用查询语句进行判断这个 id 是否存在数据库中,优势就是实现简单就一个拼接,然后查询判断是否重复;劣势就是在高并发时,如果是单个数据库就会有写入性能瓶颈当然也可以采用分库分表提升性能,但也不是我们最推荐的方式。

6、 Redis 原子性
利用 redis 执行 setnx 命令,天然具有幂等性。从而实现不重复消费

优先队列

1、使用场景
在我们系统中有一个订单催付的场景,我们的客户在天猫下的订单,淘宝会及时将订单推送给我们,如果在用户设定的时间内未付款那么就会给用户推送一条短信提醒,很简单的一个功能对吧,但是,tmall商家对我们来说,肯定是要分大客户和小客户的对吧,比如像苹果,小米这样大商家一年起码能给我们创造很大的利润,所以理应当然,他们的订单必须得到优先处理,而曾经我们的后端系统是使用 redis 来存放的定时轮询,大家都知道 redis 只能用 List 做一个简简单单的消息队列,并不能实现一个优先级的场景,所以订单量大了后采用 RabbitMQ 进行改造和优化,如果发现是大客户的订单给一个相对比较高的优先级,否则就是默认优先级。

2、如何添加
(1)队列添加优先级

Map params = new HashMap();
params.put("x-max-priority", 10);
channel.queueDeclare("hello", true, false, false, params);

(2)消息添加优先级

AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(5).build();
惰性队列

1、使用场景
RabbitMQ 从 3.6.0 版本开始引入了惰性队列的概念。惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持更多的消息存储。当消费者由于各种各样的原因(比如消费者下线、宕机亦或者是由于维护而关闭等)而致使长时间内不能消费消息造成堆积时,惰性队列就很有必要了。默认情况下,当生产者将消息发送到 RabbitMQ 的时候,队列中的消息会尽可能的存储在内存之中,这样可以更加快速的将消息发送给消费者。即使是持久化的消息,在被写入磁盘的同时也会在内存中驻留一份备份。当 RabbitMQ 需要释放内存的时候,会将内存中的消息换页至磁盘中,这个 *** 作会耗费较长的
时间,也会阻塞队列的 *** 作,进而无法接收新的消息。虽然 RabbitMQ 的开发者们一直在升级相关的算法,但是效果始终不太理想,尤其是在消息量特别大的时候。

2、两种模式
队列具备两种模式:default 和 lazy。默认的为 default 模式,在 3.6.0 之前的版本无需做任何变更。lazy模式即为惰性队列的模式,可以通过调用 channel.queueDeclare 方法的时候在参数中设置,也可以通过Policy 的方式设置,如果一个队列同时使用这两种方式设置的话,那么 Policy 的方式具备更高的优先级。如果要通过声明的方式改变已有队列的模式的话,那么只能先删除队列,然后再重新声明一个新的。在队列声明的时候可以通过“x-queue-mode”参数来设置队列的模式,取值为“default”和“lazy”。下面示例中演示了一个惰性队列的声明细节:

Map args = new HashMap();
args.put("x-queue-mode", "lazy");
channel.queueDeclare("myqueue", false, false, false, args)

3、内存开销对比

在发送 1 百万条消息,每条消息大概占 1KB 的情况下,普通队列占用内存是 1.2GB,而惰性队列仅仅占用 1.5MB 。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5705725.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-18

发表评论

登录后才能评论

评论列表(0条)

保存