RabbitMQ—消息队列

RabbitMQ—消息队列,第1张

RabbitMQ—消息队列 一、RabbitMQ介绍

RabbitMQ是一个在AMQP( 一个提供统一消息服务的应用层标准高级消息队列协议)基础上完成的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。

主要特性:

保证可靠性:使用一些机制来保证可靠性,如持久化、传输确认、发布确认灵活的路由功能可伸缩性:支持消息集群,多台RabbitMQ服务器可以组成一个集群高可用性:RabbitMQ集群中的某个节点出现问题时队列任然可用支持多种协议支持多语言客户端提供良好的管理界面提供跟踪机制:如果消息出现异常,可以通过跟踪机制分析异常原因提供插件机制:可通过插件进行多方面扩展 RabbitMQ逻辑结构

用户虚拟主机队列
二、RabbitMQ安装及配置(Linux) 2.1 安装前准备

如果之前安装过erlang,先删除

yum remove erlang*
安装C++编译环境
# yum -y install make gcc gcc-c++
yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel unixODBC unixODBC-devel httpd python-simplejson
进入/usr/local/下载erlang和rabbitMQ
# 下载erlang
wget http://www.erlang.org/download/otp_src_20.1.tar.gz

# 下载rabbitMQ
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.0/rabbitmq-server-generic-unix-3.7.0.tar.xz

在/usr/local/新建erlang指定安装文件夹

mkdir erlang
2.2 安装erlang

解压erlang安装包

tar -xvf otp_src_20.1.tar.gz

进入解压文件夹

cd otp_src_20.1

指定安装目录及安装配置(需要先安装并配置JDK)

# erlang指定安装在/usr/local/erlang目录 
./configure --prefix=/usr/local/erlang --enable-smp-support --enable-threads --enable-sctp --enable-kernel-poll --enable-hipe --with-ssl --without-javac

编译与安装

make && make install

配置erlang环境变量

vim /etc/profile

将 export PATH=$PATH:/usr/local/erlang/bin 添加到文件末尾

重新加载profile文件

source /etc/profile
2.3 安装RabbitMQ

回到/usr/local/解压RabbitMQ安装包

由于下载的安装包为xz文件,先将xz解压为tar

xz -d rabbitmq-server-generic-unix-3.7.0.tar.xz

再解压缩tar文件

tar -xvf rabbitmq-server-generic-unix-3.7.0.tar
启动RabbitMQ

进入到解压的RabbitMQ的sbin目录

cd rabbitmq_server-3.7.0/sbin

启动

./rabbitmq-server

查看进程

lsof -i:5672
2.4 启动管理界面

启动RabbitMQ的管理系统插件(在rabbitmq_server-3.7.0/sbin目录)

./rabbitmq-plugins enable rabbitmq_management
关闭防火墙(CentOS7)
#关闭防火墙 
systemctl stop firewalld
#开机禁用 
systemctl disable firewalld
#查看状态
systemctl status firewalld
云服务器放行端口15672(管理界面端口号)5672(RabbitMQ端口号)管理系统登录:访问 http://47.96.11.185:15672/
三、RabbitMQ用户管理

RabbitMQ默认提供了⼀个guests账号,但是此账号不能⽤作远程登录,也就是不能在管理系统的登录;我们可以创建⼀个新的账号并授予响应的管理权限来实现远程登录

在linux中使用命令行创建用户

## 进⼊到rabbit_mq的sbin⽬录
cd /usr/local/rabbitmq_server-3.7.0/sbin

## 新增⽤户
./rabbitmqctl add_user ytao admin123

设置用户级别

## ⽤户级别:
## 1.administrator 可以登录控制台、查看所有信息、可以对RabbitMQ进⾏管理
## 2.monitoring 监控者 登录控制台、查看所有信息
## 3.policymaker 策略制定者 登录控制台、指定策略
## 4.managment 普通管理员 登录控制台

./rabbitmqctl set_user_tags ytao administrator

然后就可以用此账号密码登录RabbitMQ管理界面访问新增用户
新建虚拟主机
用户绑定虚拟主机
四、RabbitMQ工作方式

消息通信是由两个角色完成:消息生产者(producer)和 消息消费者(Consumer)
队列中的消息只能被消费一次

4.1 简单模式

⼀个队列只有⼀个消费者
⽣产者将消息发送到队列,消费者从队列取出数据

4.2 工作模式

多个消费者监听同⼀个队列,但多个消费者中只有⼀个消费者会成功的消费消息

4.3 订阅模式

⼀个交换机绑定多个消息队列,每个消息队列有⼀个消费者监听,消息⽣产者发送的消息可以被每⼀个消费者接收

4.4 路由模式

⼀个交换机绑定多个消息队列,每个消息队列都有自己唯⼀的key,每个消息队列有⼀个消费者监听,消息生成者可以通过key指定将某消息发送到某队列

五、RabbitMQ交换机和队列管理 5.1 基于管理系统 5.1.1 创建队列

5.1.2 创建交换机

5.1.3 交换机绑定队列


5.2 基于SpringBoot
@Configuration
public class RabbitMQConfiguration {

	 //声明队列
	 @Bean
	 public Queue queue9(){
	 Queue queue9 = new Queue("queue9");
	 //设置队列属性
	 return queue9;
	 }
	 @Bean
	 public Queue queue10(){
	 Queue queue10 = new Queue("queue10");
	 //设置队列属性
	 return queue10;
	 }
	 
	 //声明订阅模式交换机
	 @Bean
	 public FanoutExchange ex5(){
	 return new FanoutExchange("ex5");
	 }
	 
	 //声明路由模式交换机
	 @Bean
	 public DirectExchange ex6(){
	 return new DirectExchange("ex6");
	 }
	 
	 //绑定队列
	 @Bean
	 public Binding bindingQueue9(Queue queue9, DirectExchange ex6){
	 return BindingBuilder.bind(queue9).to(ex6).with("k1");
	 }
	 @Bean
	 public Binding bindingQueue10(Queue queue10, DirectExchange ex6){
	 return BindingBuilder.bind(queue10).to(ex6).with("k2");
	 }
}
六、在SpringBoot应用中使用RabbitMQ 6.1 消息生产者

添加依赖 (messaging-spring for RabbitMQ)


    org.springframework.boot
    spring-boot-starter-amqp


    org.springframework.amqp
    spring-rabbit-test
    test

配置application.yml

server:
 port: 9001
spring:
 application:
 	name: producer
 rabbitmq:
 	host: 47.96.11.185
	port: 5672
	virtual-host: host1
 	username: ytao
 	password: admin123

发送消息

@Service
public class TestService {

	 @Resource
	 private AmqpTemplate amqpTemplate;
	 
		 public void sendMsg(String msg){
		 //1. 发送消息到队列
		 amqpTemplate.convertAndSend("queue1",msg);
		 
		 //2. 发送消息到交换机(订阅交换机)
		 amqpTemplate.convertAndSend("ex1","",msg);
		 
		 //3. 发送消息到交换机(路由交换机)
		 amqpTemplate.convertAndSend("ex2","a",msg);
	 
	 }
}
6.2 消息消费者

创建项目添加依赖配置yml接收消息

@Service
//@RabbitListener(queues = {"queue1","queue2"})
@RabbitListener(queues = "queue1")
public class ReceiveMsgService {

	 @RabbitHandler
	 public void receiveMsg(String msg){
	 	System.out.println("接收MSG:"+msg);
	 }
}
七、使用RabbitMQ传递对象

RabbitMQ是消息队列,发送和接收的都是字符串/字节数组类型的消息

7.1 使用序列化对象

要求:
传递的对象实现序列化接口
对应bean类implements Serializable 添加private static final long serialVersionUID = 1L;
传递的对象的包名、类名、属性名必须⼀致

消息提供者

@Service
public class MQService {

	 @Resource
	 private AmqpTemplate amqpTemplate;
	 public void sendGoodsToMq(Goods goods){
	 
	 //消息队列可以发送 字符串、字节数组、序列化对象
	 amqpTemplate.convertAndSend("","queue1",goods);
	 }
}

消息消费者

@Service
@RabbitListener(queues = "queue1")
public class ReceiveService {

	 @RabbitHandler
	 public void receiveMsg(Goods goods){
	 System.out.println("Goods---"+goods);
	 }
}
7.2 使用JSON字符串传递

要求:对象的属性名一致

消息提供者

@Service
public class MQService {

	 @Resource
	 private AmqpTemplate amqpTemplate;
	 public void sendGoodsToMq(Goods goods){
	 
	 //消息队列可以发送 字符串、字节数组、序列化对象
	 ObjectMapper objectMapper = new ObjectMapper();
	 String msg = objectMapper.writevalueAsString(goods);
	 amqpTemplate.convertAndSend("","queue1",msg);
	 }
}

消息消费者

@Service
@RabbitListener(queues = "queue1")
public class ReceiveService {

	 @RabbitHandler
	 public void receiveMsg(Goods goods){
	ObjectMapper objectMapper = new ObjectMapper();
	 Goods goods = objectMapper.readValue(msg,Goods.class);
 	System.out.println("String---"+msg);	 
 	}
}
八、消息的可靠性

" />

8.1 消息生产者服务中实现消息确认与return监听

配置application.yml

spring:
 rabbitmq:
	 publisher-/confirm/i-type: simple ## 开启消息确认模式
	 publisher-returns: true 		##使⽤return监听机制

创建消息确认配置类

@Component
public class My/confirm/iListener implements RabbitTemplate./confirm/iCallback {

	 @Autowired
	 private AmqpTemplate amqpTemplate;
	 @Autowired
	 private RabbitTemplate rabbitTemplate;
	 
	 @PostConstruct
	 public void init(){
	 rabbitTemplate.set/confirm/iCallback(this);
	 }
	 
	 @Override
	 public void /confirm/i(CorrelationData correlationData, boolean b, String s) {
		 //参数b 表示消息确认结果
		 //参数s 表示发送的消息
		 if(b){
		 System.out.println("消息发送到交换机成功!");
		 }else{
		 System.out.println("消息发送到交换机失败!");
		 //如果失败再次发送消息
		 amqpTemplate.convertAndSend("ex4","",s);
		 }
	 }
}

创建return监听配置类

@Component
public class MyReturnListener implements RabbitTemplate.ReturnsCallback {

	 @Autowired
	 private AmqpTemplate amqpTemplate;
	 @Autowired
	 private RabbitTemplate rabbitTemplate;
	 @PostConstruct
	 public void init(){
	 rabbitTemplate.setReturnsCallback(this);
	 }
	 
	 @Override
	 public void returnedMessage(ReturnedMessage returnedMessage) {
		 System.out.println("消息从交换机分发到队列失败");
		 //如果失败再次发送消息
		 String exchange = returnedMessage.getExchange();
		 String routingKey = returnedMessage.getRoutingKey();
		 String msg = returnedMessage.getMessage().toString();
		 amqpTemplate.convertAndSend(exchange,routingKey,msg);
	 }
}
8.2 消费者服务中实现手动应答ACK
@Service
@RabbitListener(queues="queue01")
public class Consumer1 {

	 @RabbitHandler
	 public void process(String msg,Channel channel, Message message) throws IOException {
	 try {
		 System.out.println("get msg1 success msg = "+msg);
		 //参数(该条消息的索引,是否批量)
		 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
		 } catch (Exception e) {
		 //参数(该条消息的索引,是否批量,是否将该消息还原到队列)
		 channel.basicNack(message.getMessageProperties().getDeliveryTag(),false, true);
		 System.err.println("get msg1 failed msg = "+msg);
		 }
	 } 
 }
九、延迟机制

延迟队列——消息进入到队列之后,延迟指定的时间才能被消费者消费

AMQP协议和RabbitMQ队列本身是不支持延迟队列功能的,但是可以通过TTL(Time To Live)特性模拟延迟队列的功能TTL就是消息的存活时间。RabbitMQ可以分别对队列和消息设置存活时间。队列和消息设置的存活时间一到消息就会消失。

当TTL结束之后,我们可以指定将当前队列的消息转存到其他指定的队列。 9.1 使用延迟队列实现订单支付监控

指定将当前队列的消息在一定时间死亡后转存到其他指定的队列来实现消息的延迟消费

用户提交订单在30分钟内未支付则取消订单
创建路由交换机
创建消息队列
创建死信队列
队列绑定
在B服务中接收消息队列queue2的消息实现延迟消费 ⼗、消息队列作用/使用场景总结 10.1 解耦

场景说明:用户下单之后,订单系统要通知库存系统

10.2 异步

场景说明:用户注册成功之后,需要发送注册邮件及注册短信提醒

10.3 消息通信

场景说明:应用系统之间的通信,例如聊天室

10.4 流量削峰

场景说明:秒杀业务,避免高并发导致服务崩溃

10.5 日志处理

场景说明:系统中⼤量的⽇志处理

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5717034.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存