RabbitMQ实战教程(视频学习笔记)

RabbitMQ实战教程(视频学习笔记),第1张

RabbitMQ实战教程(视频学习笔记) RabbitMQ实战教程

本文为学习RabbitMQ时记录的视频中笔记,本人亲自实验并修改了某些地方。用的centos和rabbitmq的版本和视频中不一致,看到的小伙伴可以给视频个三连支持up主哈,视频地址:https://www.bilibili.com/video/BV1dE411K7MG?from=search&seid=15593601763323732951&spm_id_from=333.337.0.0

1.MQ引言 1.1 什么是MQ

​ MQ(Message Queue):翻译为消息队列,通过典型的生产者和消费者模型,生产者不断向消息队列中生产消息,消费者不断从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的消费和生产,没有业务逻辑的侵入,轻松的实现系统间的解藕。别名为:消息中间件,通过利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。

1.2 MQ有哪些

​ 当今市面上有很多的消息中间件,如老牌的ActiveMQ,RabbitMQ,炙手可热的Kafka、阿里巴巴自主研发的RocketMQ等。

1.3 不同MQ的特点
# 1.ActiveMQ
	ActiveMQ是Apache出品,最流行的,能力强劲的开源消息总线。他是一个完全支持JMS规范的消息中间件。丰富的API,多种集群架构模式让ActiveMQ称为老牌的消息中间件,在中小企业颇受欢迎。
# 2.Kafka
	Kafka是linkedIn公司开源的分布式发布-订阅消息中间件,目前属于Apache顶级项目。Kafka的主要特点是基于pull模式来处理消息消费。追求高吞吐量,一开始的目的是用于日志的收集和传输。0.8版本开始支持复制,不支持事物,对消息的丢失、重复、错误没有严格的要求。适合产生大量数据的互联网服务的数据收集业务。
# 3.RocketMQ
	RocketMQ是阿里开源的消息中间件。他是纯java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。RocketMQ的思路起源于Kafka,但并不是Kafka的一个复制,他对消息的可靠传输及事物做了优化,目前的阿里集团被广泛应用于交易、充值、流计算、消息推送、日志流处理、binglog分发等场景。
# 4.RabbitMQ
	RabbitMQ是使用Erlang语言开发的开源消息中间件系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布订阅)、可靠性、安全。AMQP协议更多用在企业内对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。

RabbitMQ比Kafka可靠,Kafka更适合IO高吞吐的处理,一般应用在大数据日志处理或对实时性(延时低)、可靠性(少量丢数据)要求稍低的场景使用,比如ELK日志收集。

2.RabbitMQ的引言 2.1 RabbitMQ简介

​ RabbitMQ是基于AMQP协议,erlang语言开发,是部署最广泛的开源消息中间件,也是最受欢迎的消息中间件之一。

# AMQP协议
	AMQP(Advanced Message Queuing Protocol,高级消息队列协议),在2003年被提出,在最用于解决金融领域不同平台之间消息传递的交互问题。顾名思义,AMQP是一种协议,更准确的说是一种binary wrie-level protocol(链接协议)。这时其和JMS的本质差异,AMQP不从api层进行限定,而是直接定义网络交换的数据格式。这使得实现了AMQP的provider天然性就是跨平台的。
								Server
					+---------------------------+
					|		Virtual Host		|
					|	+--------------------+  |
   	+------------+  |	|	+-----------+    |	|
	| Publisher  | -------->| Exchange	|    |  |
	| Application|	|	|	+-----+-----+    |  |
	+------------+	|   | 		  |			 |	|
					|	|	+-----------+	 |	|	+-------------+
					|	|	+  Message	+	 |	|	|  Consummer  |
					|	|	+	Queue	+ --------->| Application |
					|	|	+-----------+	 |	|	+-------------+
					|	+--------------------+	|
					+---------------------------+
2.2 RabbitMQ的安装

​ 本次安装的环境如下:

  • 系统:centos8 64位
  • erlang:24.1.7
  • rabbitmq:3.9.11
2.2.1 erlang下载

因为rabbitMQ是基于erlang开发的,所以先要下载erlang的包:https://github.com/rabbitmq/erlang-rpm

erlang版本和rabbitmq版本对照:https://www.rabbitmq.com/which-erlang.html

​ 本次开发使用的最新的rabbitmq版本为3.9.11,最小支持的erlang版本为23.2,所以本次erlang使用了24.1.7版本

2.2.2 RabbitMQ下载

rabbitmq下载地址:https://www.rabbitmq.com/install-rpm.html#downloads

2.2.3 安装启动
# 1.将rabbitmq相关包上传到linxu服务器中
	使用scp命令上传到服务器两个包:
	scp ./erlang-24.1.7-1.el8.x86_64.rpm root@10.3.4.5:/root/
	scp ./rabbitmq-server-3.9.11-1.el8.noarch.rpm root@10.3.4.5:/root/
# 2.依次安装erlang、rabbitmq
	使用rpm命令进行安装,缺少依赖会进行提示:
	rpm -ivh erlang-24.1.7-1.el8.x86_64.rpm
	rpm -ivn rabbitmq-server-3.9.11-1.el8.noarch.rpm
# 3.修改rabbitmq的配置
	使用rpm包的方式安装时,并没有将配置文件也放到指定目录下,所以需要自行创建一个配置文件:/etc/rabbitmq/rabbitmq.conf,具体的配置内容可以参考:https://www.rabbitmq.com/configure.html#config-file-formats。
	这里我们需要修改一处:loopback_users=none,表示能够让guest用户进行远程访问。默认情况下,guest用户只能在localhost域名下访问。我们使用的是云服务器,需要使用ip进行访问,所以需要修改这个配置。
# 4.开启管理控制台插件
	其实就是开启rabbitmq的一个插件:rabbitmq_management,可以让我们使用web界面管理rabbitmq。执行命令:rabbitmq-plugins enable rabbitmq_management。该命令还另外开启了2个插件:rabbitmq_management_agent、rabbitmq_web_dispatch。
# 5.启动/停止/重启rabbitmq服务
	rabbitmq安装的时候,会将其设置为系统服务,使用系统服务命令即可:
	systemctl start/stop/restart rabbitmq-server.service
# 6.查看服务状态
	systemctl stauts rabbitmq-server.service
    结果如下为正常运行中:
    ● rabbitmq-server.service - RabbitMQ broker
       Loaded: loaded (/usr/lib/systemd/system/rabbitmq-server.service; disabled; vendor preset: disabled)
       Active: active (running) since Sun 2021-12-12 23:56:51 CST; 1 day 9h ago
      Process: 22710 ExecStop=/usr/sbin/rabbitmqctl shutdown (code=exited, status=0/SUCCESS)
     Main PID: 22758 (beam.smp)
        Tasks: 23 (limit: 23722)
       Memory: 94.8M
       CGroup: /system.slice/rabbitmq-server.service
               ├─22758 /usr/lib64/erlang/erts-12.1.5/bin/beam.smp -W w -MBas ageffcbf -MHas ageffcbf -MBlmbcs 512 -MHlmbcs 512 -MMmcs 30 -P 1048576 -t 5000000 -stbt db -zdbbl 12800>
               ├─22773 erl_child_setup 32768
               ├─22827 inet_gethost 4
               └─22828 inet_gethost 4
# 7.访问管理界面
 	默认http后台界面的端口为15672。
 	http://10.3.4.5:15672/
# 8.登陆
	账号/密码:guest/guest
3.RabbitMQ配置 3.1 RabbitMQ管理命令行
# 1.服务管理
	systemctl start/stop/restart rabbitmq-server.service
# 2.管理命令行
	可以用来在不使用web管理端的情况下管理rabbitmq。
	rabbitmqctl help // 查看所有的命令
# 3.插件管理
	rabbitmq-plugins enable/list/disable
3.2 web管理介绍

  • Connection:连接,无论是消费者还是生产者,都要与rabbitmq建立连接才能进行消息的生产与消费。
  • Channels:通道,建立连接后,消息的投递和获取都是通过通道来进行的。
  • Exchages:交换机,用来实现消息的路有。
  • Queues:队列,消息存放于该队列中,等待消费,消费之后移除。
3.2.1用户和虚拟主机管理 3.2.1.1 用户

# Tags说明:
	Admin(超级管理员):登录控制台,查看所有信息,并对用户、策略(policy)进行修改。
	Monitoring(监控者):登陆控制台,查看rabbitmq的节点(进程数、内存、磁盘等使用情况)信息。
	Policymaker(策略制定者):登陆控制台,对policy进行管理。
	Management(普通管理员):登陆控制台查看信息。
	其他:无法登陆控制台,就是普通的消费者或者生产者。
3.2.1.2 虚拟主机

# 虚拟主机:
	为了让各个用户可以互不干扰的工作,RabbitMQ添加了虚拟主机(Virtual Host)概念。其实就是一个独立的访问路径,不同用户使用不同路径,各自都有自己的队列、交换机,互不影响。相当于MySQL中的数据库。
3.2.1.3 用户和虚拟主机绑定

4.RabbitMQ的Java客户端 4.1 AMQP协议回顾

4.2 RabbitMQ支持的消息类型

4.3 引入依赖

  com.rabbitmq
  amqp-client
  5.14.0

4.4 各种模型客户端代码 4.4.1 Direct-直连模式

# 相关概念:
	P:provider,生产者,要发送消息的程序
	C:consumer,消费者,消费消息的程序
	Queue:队列,存储消息的地方

Producer:

public class TestProducer {
    // 定义队列名字
    public static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建连接工程
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/adu");
        connectionFactory.setUsername("adu");
        connectionFactory.setPassword("adu");
        // 2.创建连接
        Connection connection = connectionFactory.newConnection();
        // 3.创建通道
        Channel channel = connection.createChannel();
        // 4.声明队列:队列名称,是否持久化,是否独占,额外参数
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        // 5.向队列发送消息:交换机名称,队列名称,额外参数,消息体
        channel.basicPublish("", QUEUE_NAME, null, "hello world".getBytes(StandardCharsets.UTF_8));
        // 6.关闭资源
        channel.close();
        connection.close();
    }
}

Consumer:

public class TestConsumer {
    public static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1 创建连接工厂,并设置基础参数
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/adu");
        connectionFactory.setUsername("adu");
        connectionFactory.setPassword("adu");
        // 2 创建连接
        Connection connection = connectionFactory.newConnection();
        // 3 创建通道
        Channel channel = connection.createChannel();
        // 4 声明队列
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        // 4 接收消息
        channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {
            // 回调方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumer:" + new String(body, StandardCharsets.UTF_8));
            }
        });
        // 5 关闭资源
        channel.close();
        connection.close();
    }
}
4.4.2 Work Queues-工作队列

​ Work Queue也被称为Task Queue任务模型。当消息处理比较耗时时,可能消息生产的速度远远超过了消息消费的速度。长此以往,消息就会堆积,此时就可以使用Work Queue模型。让多个消费者绑定同一个队列,共同消费队列中的消息。队列中的消息一旦被消费,就会被删除,因此不会产生重复消费。

// 提取连接rabbitmq的工具类
public class RabbitMQUtil {

    public static Connection getConnection()    {
        try {
            // 创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            // 设置参数
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/adu");
            connectionFactory.setUsername("adu");
            connectionFactory.setPassword("adu");
            // 创建连接
            return connectionFactory.newConnection();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
        return null;
    }

    public static Channel getChannel(Connection connection) {
        try {
            return connection.createChannel();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }

    public static void close(Channel channel, Connection connection) {
        if (channel != null) {
            try {
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        }
        if (connection != null) {
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

Producer:

public class WorkQueueProducer {

    public static final String QUEUE_NAME = "work";

    public static void main(String[] args) throws IOException {
        // 获取连接
        Connection connection = RabbitMQUtil.getConnection();
        // 获取通道
        Channel channel = RabbitMQUtil.getChannel(connection);
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        // 发消息
        for (int i = 0; i < 10; i++) {
            channel.basicPublish("", QUEUE_NAME, null, ("hello work queue " + i).getBytes(StandardCharsets.UTF_8));
        }
        // 关闭资源
        RabbitMQUtil.close(channel, connection);
    }
}

Consumer:

// consumer1
public class WorkQueueConsumer1 {
    public static final String QUEUE_NAME = "work";
    public static void main(String[] args) throws IOException {
        // 获取连接
        Connection connection = RabbitMQUtil.getConnection();
        // 获取管道
        Channel channel = RabbitMQUtil.getChannel(connection);
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        // 消费消息
        channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumer1: " + new String(body, StandardCharsets.UTF_8));
            }
        });
    }
}

// consumer2
和consumer1相同代码,修改消费输出:System.out.println("consumer2: " + new String(body, StandardCharsets.UTF_8));

结果:

# consumer1 output...
consumer1: hello work queue 0
consumer1: hello work queue 2
consumer1: hello work queue 4
consumer1: hello work queue 6
consumer1: hello work queue 8
# consumer2 output...
consumer2: hello work queue 1
consumer2: hello work queue 3
consumer2: hello work queue 5
consumer2: hello work queue 7
consumer2: hello work queue 9

总结:

​ 默认情况下,RabbitMQ将按顺序依次将消息发给每个消费者,也就是说每个消费者都会收到相同数量的消息,这种平分消息的方式称为循环。

4.4.2.1 消息自动确认机制

​ 如果一个消费者在执行任务的过程中宕机,那么在以上代码中,一旦RabbitMQ将将消息传递给消费者,他就会立即标记为删除,那么该消费者中未处理的消息将丢失。我们希望在处理任务的过程中,一个消费者宕机了,能够将任务交给其他消费者处理。这时候我们就可以用到消息自动确认机制。也就是说,我们可以在处理完任务之后,再通知RabbitMQ可以删除该任务。

Producer:

// 同上

Consumer:

// consumer1
public class WorkQueueConsumer3 {
    public static final String QUEUE_NAME = "work";
    public static void main(String[] args) throws IOException {
        // 获取连接
        Connection connection = RabbitMQUtil.getConnection();
        // 获取管道
        Channel channel = RabbitMQUtil.getChannel(connection);
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        // 每次只能消费一个消息
        channel.basicQos(1);
        // 消费消息
        channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(1000); // 模拟处理消息消耗时常
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("consumer1: " + new String(body, StandardCharsets.UTF_8));
                // 手动确认,参数1:消息标记 参数2:是否同时确认多条,false表示每次只能确认一条
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }

}
// consumer2
public class WorkQueueConsumer4 {
    public static final String QUEUE_NAME = "work";
    public static void main(String[] args) throws IOException {
        // 获取连接
        Connection connection = RabbitMQUtil.getConnection();
        // 获取管道
        Channel channel = RabbitMQUtil.getChannel(connection);
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        // 限制每次消费1条
        channel.basicQos(1);
        // 消费消息,autoAck改为false,表示不自动确认
        channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(1500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("consumer1: " + new String(body, StandardCharsets.UTF_8));
                // 手动确认
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }
}

结果:

# consumer1 output...
consumer1: hello work queue 1
consumer1: hello work queue 3
consumer1: hello work queue 5
consumer1: hello work queue 8
consumer1: hello work queue 10
consumer1: hello work queue 13
consumer1: hello work queue 15
consumer1: hello work queue 18
# consumer2 output...
consumer1: hello work queue 0
consumer1: hello work queue 2
consumer1: hello work queue 4
consumer1: hello work queue 6
consumer1: hello work queue 7
consumer1: hello work queue 9
consumer1: hello work queue 11
consumer1: hello work queue 12
consumer1: hello work queue 14
consumer1: hello work queue 16
consumer1: hello work queue 17
consumer1: hello work queue 19

总结:

​ 实现消息自动确认,需要注意两点:1)设置通道一次只能消费一条消息 2)关闭自动确认,手动确认消息

4.4.3 Publish/Subscribe-发布订阅模式

# 消息流程:
	可以有多个消费者,每个消费者都有自己的queue(队列),每个队列都要绑定要exchange(交换机)。
	生产者发送的消息只能发送给交换机,交换机来决定发送给哪个队列,生产者无法决定。
	交换机把消息发送给绑定过的所有队列,队列的消费者都能拿到消息,实现一个消息被多个消费者消费。
# 对应交换机类型:fanout

Proudcer:

public class PublishSubscriptProducer {

    public static final String QUEUE_NAME = "fanout";

    public static void main(String[] args) throws IOException {
        // 获取连接
        Connection connection = RabbitMQUtil.getConnection();
        // 获取通道
        Channel channel = RabbitMQUtil.getChannel(connection);
        // 声明交换机 交换机名称 交换机类型
        channel.exchangeDeclare("logs", "fanout");
        // 发送消息到交换机
        channel.basicPublish("logs", "", null, "hello fanout".getBytes(StandardCharsets.UTF_8));
        // 关闭资源
        RabbitMQUtil.close(channel, connection);
    }
}

Consumer:

// consumer1
public class PublishSubscribeConsumer1 {

    public static void main(String[] args) throws IOException {
        // 获取连接
        Connection connection = RabbitMQUtil.getConnection();
        // 获取通道
        Channel channel = RabbitMQUtil.getChannel(connection);
        // 声明交换机
        channel.exchangeDeclare("logs", "fanout");
        // 创建临时队列
        String queue = channel.queueDeclare().getQueue();
        // 绑定交换机和队列 队列名称 交换机名称 路由
        channel.queueBind(queue, "logs", "");
        // 接收消息
        channel.basicConsume(queue, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumer1: " + new String(body, StandardCharsets.UTF_8));
            }
        });
    }
}

// consumer2
同consumer1,输出修改为:System.out.println("consumer2: " + new String(body, StandardCharsets.UTF_8));

结果:

# consumer1 output...
consumer1: hello fanout, type [
consumer1: hello fanout

# consumer2 output...
consumer2: hello fanout, type [
consumer2: hello fanout
4.4.4 Routing-路由模式

​ 在发布订阅模式中,一条消息会被所有订阅的的队列消费。但是在某些场景下,我们希望不同的消息类型被不同的队列消费,这时就需要用到Routing模式的direct类型的Exchange。

# 在Direct模式下:
	队列与交换机的绑定,不能是任意绑定了,而是需要指定一个Routing Keying,消息发送方在向Exchange发送消息时,也必须指定消息的Routing Key。
	Exchange也不再把消息交给每一个与之绑定的队列,而是根据消息的Routing Key判断,只有队列的Routing Key和消息的Routing Key完全一致时,才会发送消息。

# 说明:
	P:Proudcer,生产者,向Exchange发送消息,同时会指定一个Routing Key。
	X:Exchange,交换机,接收生产者发送的消息,然后把消息传递给与routing key完全匹配的队列。
	C1:Consumer,消费者,其所在队列指定了需要routing key为error的消息
	C2:Consumer,消费者,其所在队列指定了需要routing key为info、error、warning的消息

Producer:

public class RoutingProducer {

    public static void main(String[] args) throws IOException {
        // 获取连接
        Connection connection = RabbitMQUtil.getConnection();
        // 获取管道
        Channel channel = RabbitMQUtil.getChannel(connection);
        // 声明交换机
        channel.exchangeDeclare("logs_direct", "direct");
        // 发送消息
        String routingKey = "warning";
        channel.basicPublish("logs_direct", routingKey, null, String.format("routing for type [%s]", routingKey).getBytes(StandardCharsets.UTF_8));
        // 关闭资源
        RabbitMQUtil.close(channel, connection);
    }
}

Consumer:

// consumer1
public class RoutingConsumer1 {

    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = RabbitMQUtil.getChannel(connection);
        channel.exchangeDeclare("logs_direct", "direct");
        String queue = channel.queueDeclare().getQueue();
        // consumer1绑定routing key 为info
        channel.queueBind(queue, "logs_direct", "info");
        channel.basicConsume(queue, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumer1: " + new String(body, StandardCharsets.UTF_8));
            }
        });
    }
}

// consumer2
public class RoutingConsumer2 {

    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = RabbitMQUtil.getChannel(connection);
        channel.exchangeDeclare("logs_direct", "direct");
        String queue = channel.queueDeclare().getQueue();
        // consumer1绑定routing key 为info、error、warning
        channel.queueBind(queue, "logs_direct", "info");
        channel.queueBind(queue, "logs_direct", "error");
        channel.queueBind(queue, "logs_direct", "warning");
        channel.basicConsume(queue, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumer2: " + new String(body, StandardCharsets.UTF_8));
            }
        });
    }
}

结果:

# 依次发送routing key为info、error、warning结果:

## consummer1
consumer1: routing for type [info]

## consumer2
consumer1: routing for type [info]
consumer1: routing for type [error]
consumer1: routing for type [warning]
4.4.5 Topics-主题(动态路由模式)

​ Topics模式同Routing模式相比,都可以根据Routing Key把消息路由到不同的队列。只不过Topic类型的Routing Key可以使用通配符。Routing Key一般是由一个或多个单词组成,多个单词之间以‘,’分割。Exchange的类型为topic。

# 通配符:
	*(star)can substitute for exactly one word。精确匹配一个单词。
	#(hash)can substitute for zero or more words。匹配零个、一个或多个单词。
## 例如:
	audit.# :匹配 audit、audit.irs、audit.irs.corporate 等。
	audit.* :只能匹配 audit.irs。

Producer:

public class TopicsProducer {

    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = RabbitMQUtil.getChannel(connection);
        channel.exchangeDeclare("logs_topics", "topic");
        String routingKey = "user.save.now";
        channel.basicPublish("logs_topics", routingKey, null, String.format("hello topics, routing key : [%s]", routingKey).getBytes(StandardCharsets.UTF_8));
        RabbitMQUtil.close(channel, connection);
    }
}

Consumer:

// consumer1
public class TopicsConsumer1 {

    public static final String EXCHANGE_NAME = "logs_topics";

    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = RabbitMQUtil.getChannel(connection);
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        String queue = channel.queueDeclare().getQueue();
        // 队列和交换机绑定的路由为 user.*
        channel.queueBind(queue, EXCHANGE_NAME, "user.*");
        channel.basicConsume(queue, false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumer1: " + new String(body, StandardCharsets.UTF_8));
            }
        });
    }
}

// consumer2
public class TopicsConsumer2 {

    public static final String EXCHANGE_NAME = "logs_topics";

    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = RabbitMQUtil.getChannel(connection);
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        String queue = channel.queueDeclare().getQueue();
        // 队列和交换机绑定的路由为 user.*
        channel.queueBind(queue, EXCHANGE_NAME, "user.#");
        channel.basicConsume(queue, false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumer2: " + new String(body, StandardCharsets.UTF_8));
            }
        });
    }
}

结果:

# 依次发送routing key为user.save、user、user.save.now的结果:

## conumser1:
consumer1: hello topics, routing key : [user.save]

## consumer2:
consumer2: hello topics, routing key : [user.save]
consumer2: hello topics, routing key : [user]
consumer2: hello topics, routing key : [user.save.now]
5.SpringBoot重使用RabbitMQ 5.1 搭建环境 5.1.1 引入依赖

    org.springframework.boot
    spring-boot-starter-amqp

5.1.2 配置信息
spring:
  application:
    name: adu-rabbitmq
  rabbitmq:
    host: 10.2.3.4
    port: 5672
    username: adu
    password: adu
    virtual-host: /adu
5.2 各种模型客户端代码 5.2.1 Hello World-直连模型
@Component
@RabbitListener(queuesToDeclare = @Queue(value = "hello"))
public class HelloWorldConsumer {

    @RabbitHandler
    public void receive(String message) {
        System.out.println("consumer: " + message);
    }

}
5.2.2 Work Queue - 工作队列模式
@Component
public class WorkQueueConsumer {

    @RabbitListener(queuesToDeclare = @Queue(value = "work", durable = "true", exclusive = "false", autoDelete = "false"))
    public void receive1(String message) {
        System.out.println("consumer1: " + message);
    }

    @RabbitListener(queuesToDeclare = @Queue(value = "work", durable = "true", exclusive = "false", autoDelete = "false"))
    public void receive2(String message) {
        System.out.println("consumer2: " + message);
    }

}
5.2.3 Publish/Subscribe - 发布订阅模式
@Component
public class PublishSubscribeConsumer {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue, // 没有参数的,表示临时队列
            exchange = @Exchange(value = "logs", type = "fanout") // 交换机
    ))
    public void receive1(String message) {
        System.out.println("consumer1: " + message);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue, // 没有参数的,表示临时队列
            exchange = @Exchange(value = "logs", type = "fanout") // 交换机
    ))
    public void receive2(String message) {
        System.out.println("consumer2: " + message);
    }
}
5.2.4 Routing - 路由模式(direct)
@Component
public class RoutingConsumer {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue, // 临时队列
            exchange = @Exchange(value = "logs_direct", type = "direct"),
            key = "info" // routing key
    ))
    public void receive1(String message) {
        System.out.println("consumer1: " + message);
    }


    @RabbitListener(bindings = @QueueBinding(
            value = @Queue, // 临时队列
            exchange = @Exchange(value = "logs_direct", type = "direct"),
            key = {"info", "error", "warning"} // routing key
    ))
    public void receive2(String message) {
        System.out.println("consumer2: " + message);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue, // 临时队列
            exchange = @Exchange(value = "logs_direct", type = "direct"),
            key = {"warning"}
    ))
    public void receive3(String message) {
        System.out.println("consumer3: " + message);
    }
}
5.2.5 Topics - 主题(动态路由)模式
@Component
public class TopicsConsumer {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue, // 临时队列
            exchange = @Exchange(value = "logs_topics", type = "topic"),
            key = {"user.*"}
    ))
    public void receive1(String message) {
        System.out.println("consumer1: " + message);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue, // 临时队列
            exchange = @Exchange(value = "logs_topics", type = "topic"),
            key = {"user.#"}
    ))
    public void receive2(String message) {
        System.out.println("consumer2: " + message);
    }

}
5.2.6 客户端测试代码
@SpringBootTest(classes = RabbitMQApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@RunWith(SpringRunner.class)
@EnableAutoConfiguration
public class RabbitMQApplicationTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    // 直连模式
    @Test
    public void testHelloWorld() {
        rabbitTemplate.convertAndSend("hello", "hello spring boot starter amqp...");
    }
	// Work Queue模式
    @Test
    public void testWorkQueue() {
        for (int i = 0; i < 20; i++) {
            rabbitTemplate.convertAndSend("work", "hello work queue" + i);
        }
    }
	// Publish/Subscribe模式
    @Test
    public void testPublishSubscribe() {
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend("logs", "", "hello publish subscribe " + i);
        }
    }
	// Routing模式
    @Test
    public void testRouting() {
        String routingKey = "warning";
        rabbitTemplate.convertAndSend("logs_direct", routingKey, "hello routing, type=" + routingKey);
    }
	// Topics模式
    @Test
    public void testTopics() {
        String routingKey = "user.save";
        rabbitTemplate.convertAndSend("logs_topics", routingKey, "hello topics, type=" + routingKey);
    }
}
6.MQ的应用场景 6.1 异步处理
# 场景说明:
	用户注册后,需要发送邮件和注册短信,传统的做法有2种:1)串行方式 2)并行方式
## 串行方式
	将注册信息写入数据库后,再发送邮件,再注册短信,以上三个任务全部返回结果后,才返回给客户端。这里有一个问题,邮件、短信并不是必须的,它只是一个通知,而这种做法需要让客户端等待没有必要等待的东西。
## 并行方式
	将注册信息写入数据库后,发送邮件的同时,发送短信。以上三个任务完成后,返回给客户端,并行的方式能提高处理的效率。

# 消息队列:
	除了以上提到的2种传统方式之外,我们还可以选择 消息队列 的形式。因为发送邮件和发送短信不是必须的,所以我们可以使用消息队列的形式,进行异步处理,相对于并行方式,减少了发送邮件或者短信的时间,相当于只有 注册信息写入数据库 + 写入消息队列 这个 *** 作的耗时。

6.2 应用解藕

# 场景:
	双十一是购物节,用户下单后,订单系统需要通知库存系统,传统做法是订单系统调用库存系统提供的接口。
# 缺点:
	这种做法有个缺点:当库存系统异常时,订单系统就无法使用。订单系统和库存系统就出现了耦合。解决方法引入消息队列。

# 引入消息队列:
	订单系统:用户下单后,订单系统完成持久化后,将消息写入 消息队列 ,返回用户下单成功。
	库存系统:订阅下单消息,获取下单消息,进行库存 *** 作。就算库存系统出现故障,消息队列也能保证消息的可靠投递,不会导致消息丢失。
6.3 流量削峰
# 场景:
	秒杀活动,一般会因为流量过大,导致应用挂掉。为了解决这个问题,一般在应用之前加入消息队列。
# 作用:
	1.可以控制活动人数,超过一定阈值的订单直接丢弃
	2.可以缓解短时间的高流量压垮应用(程序按自己处理能力获取订单)。
# 流程:
	1.用户的请求,服务器收到之后先加入到消息队列。假如超过消息队列的最大长度,则直接丢弃用户请求或者跳转到错误页面。
	2.秒杀业务根据消息队列中的请求信息,再做后续的处理。
7.RabbitMQ集群 7.1 集群架构(副本集群)

默认情况下:RabbitMQ代理 *** 作所需的所有数据/状态都将跨所有节点复制。这方面的一个例外是 消息队列,默认情况下,消息队列位于一个节点上,尽管他们可以从任意节点上看到和访问。–官网

也就是说,集群架构的方式只能在节点之间同步(复制)交换机等基本信息,无法同步队列信息。而队列信息是保存数据的地方,如果无法在节点之间进行复制,那么集群将失去一些意义。

7.1.1 架构图

# 核心解决问题:
	当集群中某一时刻Master节点宕机,可以对Queue中信息进行备份。
7.1.2 集群搭建
PS:使用docker安装
# 0.集群规划
	node1:  172.18.0.2 5672 15672 		master 主节点
	node2:	172.18.0.3 5673 15673		repl1  副本节点
	node3:	172.18.0.4 5674 15674	 	repl2  副本节点
# 1.克隆三台主机名和ip映射,并修改各个虚拟机的hostname
	创建网络:
		docker network create rabbit-net
	创建容器:
		docker run -d --privileged=true --hostname node1 --name node1 -p 15672:15672 -p 5672:5672 --network rabbit-net centos:8.2.2004 /sbin/init
	进入容器:
		docker exec -it node/node1/node2 /bin/bash
	复制erlang、rabbit的rpm包到容器:
		docker cp ./erlang.xxx.rpm node1:/
		docker cp ./rabbitmq.xxx.rpm node1:/
		(可到资源中下载这两个包:https://download.csdn.net/download/DMaker1993/64905661)
	安装:
		rpm -ihv erlang.xxx.rpm
		rpm -ivh rabbitmq.xxx.rpm --nodeps --force
	配置:
		在/etc/hosts加入各个节点的ip和域名对应关系。
	问题:
		参考:https://blog.csdn.net/weixin_42181917/article/details/105579288
		1.System has not been booted with systemd as init system (PID 1). Can't operate.Failed to create bus connection: Host is down
		解决:创建容器使用 /sbin/init
		2.Could not set property: Failed to set static hostname: Device or resource busy
		解决:退出容器,重新进入在设置一次
			hostnamectl set-hostname node/node1/node2
			exit
			hostnamectl set-hostname node/node1/node2
# 2.三台机器安装rabbitmq,并同步cookie文件
	docker cp 命令将宿主机中的erlang和rabbitmq的rpm包都上传到虚拟机中,启动rabbitmq。
	docker cp 命令将node1节点中的 /var/lib/rabbitmq/.erlang.cookie文件复制到宿主机,再通过宿主机复制到node1和node2.
	这里需要注意看是否.erlang.cookie文件的权限是否和node相同:chown、chgrp 命令可以修改权限。
# 3.查看cookie文件是否一致
	进入三个节点的虚拟机中,执行: cat /var/lib/rabbitmq/.erlang.cookie,查看内容是否一致。
# 4.后台启动rabbit
	rabbitmq-server -detached
# 5.在node2和node3执行加入集群命令
	1.关闭		rabbitmqctl stop_app
	2.加入集群	   rabbitmqctl join_cluster rabbit@node
	3.启动服务	   rabbitmqctl start_app
	

搭建成功:

7.2 镜像集群

镜像队列机制就是将队列在三个节点之间设置主从关系,消息会在三个节点之间自动进行同步。且如果其中一个节点不可用,并不会导致消息丢失或者服务不可用的情况,提升MQ集群整体的高可用性。–摘自官网

7.2.1 架构图

7.2.2 集群搭建

​ 镜像集群的搭建,是在副本集群的基础之上做的额外配置,也就是说必选先搭建好副本集群才能搭建镜像集群。这个额外配置就是需要创建一个策略。

# 0.策略说明
	rabbitmqctl set_policy [--vhost ] [--priority ] [--apply-to ]   
	--vhost:可选参数,针对指定virutal host下的queue进行设置
	--priority:可选参数,policy的优先级,越大越优先
	--appliy-to:可选参数,表示策略应用的对象,后面跟queues|exchanges|all。
	name:策略名称,唯一标识。
	pattern:匹配队列的正则表达式。
	definition:镜像定义,包括三个部分:ha-mode、ha-params、ha-sync-mode
		ha-mode:镜像队列模式,可选值:all/exaclty/nodes
			all:表示在集群的所有节点上进行镜像
			exactly:表示在指定个数的节点上进行镜像,节点个数通过ha-params指定
			nodes:表示在指定节点上进行镜像,节点名称通过ha-params指定
		ha-params:ha-mode需要用到的参数
		ha-sync-mode:队列同步方式,可选值为:automatic/manual
			automatic:自动同步
			manual:用户触发同步
# 1.查看当前策略
	rabbitmqctl list_policies
# 2.添加策略
	rabbitmqctl set_policy ha-all '^hello' '{"ha-mode":"all","ha-sync-mode":"automatic"}'
	说明:策略正则表示为"^"表示匹配所有队列,"^hello"表示匹配hello开头的队列
# 3.删除策略
	rabbitmqctl clear_policy ha-all

搭建成功:

Tips:
上文中提到的erlang和rabbitmq的安装包下载。


最后:因小的才疏学浅,如有问题,请不吝指出,感谢感谢~

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5676256.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存