我们选择在linux下安装
安装的前提需要在虚拟机下安装docker
docker pull rabbitmq:management(拉去镜像) docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management(设置容器) |
安装完成后访问我们的管理页面 http://192.168.1.111:15672/
192.168.1.111是你们设置的虚拟机地址
安装虚拟机和docker可以翻阅我的文章
显示登录界面表示成功了。账号密码默认 guest 。
2.rabbitmq的概念 2.1.1. 什么是 MQMQ(message queue),从字面意思上看,本质是个队列,FIFO 先入先出,只不过队列中存放的内容是 message 而已,还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ 是一种非常常 见的上下游“逻辑解耦+物理解耦”的消息通信服务。使用了 MQ 之后,消息发送上游只需要依赖 MQ,不 用依赖其他服务。
2.1.2. 为什么要用 MQ1.流量消峰 举个例子,如果订单系统最多能处理一万次订单,这个处理能力应付正常时段的下单时绰绰有余,正 常时段我们下单一秒后就能返回结果。但是在高峰期,如果有两万次下单 *** 作系统是处理不了的,只能限 制订单超过一万后不允许用户下单。使用消息队列做缓冲,我们可以取消这个限制,把一秒内下的订单分 散成一段时间来处理,这时有些用户可能在下单十几秒后才能收到下单成功的 *** 作,但是比不能下单的体 验要好。 2.应用解耦 以电商应用为例,应用中有订单系统、库存系统、物流系统、支付系统。用户创建订单后,如果耦合 调用库存系统、物流系统、支付系统,任何一个子系统出了故障,都会造成下单 *** 作异常。当转变成基于 消息队列的方式后,系统间调用的问题会减少很多,比如物流系统因为发生故障,需要几分钟来修复。在 这几分钟的时间里,物流系统要处理的内存被缓存在消息队列中,用户的下单 *** 作可以正常完成。当物流 系统恢复后,继续处理订单信息即可,中单用户感受不到物流系统的故障,提升系统的可用性。
2.2.1. RabbitMQ 的概念RabbitMQ 是一个消息中间件:它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包 裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑 RabbitMQ 是 一个快递站,一个快递员帮你传递快件。RabbitMQ 与快递站的主要区别在于,它不处理快件而是接收, 存储和转发消息数据。
2.2.2. 四大核心概念生产者
产生数据发送消息的程序是生产者
交换机
交换机是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息 推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推 送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定
队列
队列是 RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但它们只能存 储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可 以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式
消费者
消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费 者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。
还有很多很想细节的需要大家查阅互联网的其他信息。
在这里提出一些重点和常用的点。
3.实战测试小dome 3.1准备新建一个maven工程
引入mq的依赖
com.rabbitmq
amqp-client
5.8.0
commons-io
commons-io
2.6
3.2 用代码演示生产者->队列->消费者关系 一个生产者对应一个消费者
原理图
创建实体类(以后每次的案例都会放在com.dfp.xxx包下 ,每次安排想对应的一组案例都会在同一个xxx包下)
package com.dfp.one;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消息生产者
*/
public class Producer {
//队列名称
private final static String QUEUE_NAME="dfp_1";
public static void main(String[] args) throws IOException, TimeoutException {
/**
* //创建一个工厂链接
* 设置mq安装的地址
* 登录的账号
* 登录的密码
*/
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("192.168.1.111");
factory.setUsername("guest");
factory.setPassword("guest");
//channel实现自动close接口自动关闭,不需要显示
// 创建连接
Connection connection= factory.newConnection();
// 创建信道
Channel channel= connection.createChannel();
/**
* 生成一个队列
* 1.队列名称
* 2.队列里面的消息是非持久化,默认消息存放在内存中
* 3.该队列是否提供一个消费者进行消费,是否决定共享true可以多个消费者消费
* 4.是否自动删除,最后一个消费者断开后,该队列是否自动删除。true自动删除
* 5.其他参数
*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
String msg="hello,dfp";
/**
* 发送一个消息
* 1.发送到交换机的位置
* 2.路由的key是哪个
* 3.其他参数
* 4.发送的消息体
*/
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());//二进制传输
System.out.println("消息发送完毕");
}
}
package com.dfp.one;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消费者
*/
public class Consumer {
private final static String QUEUE_NAME ="dfp_1";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.1.111");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel= connection.createChannel();
System.out.println("等待消息接收。。。。");
//推送的消息如何进行消费的接口回调
DeliverCallback deliverCallback=(consumerTag,delivert)->{
String msg=new String(delivert.getBody());
System.out.println(msg);
};
//取消消费者的一个回调接口,如何在消费的时候队列删除掉了
CancelCallback cancelCallback=(consumerTag)->{
System.out.println("消息中断。。。");
};
/**
* 消费者消费消息
* 1.消费哪个队列
* 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
* 3.消费者未成功消费的回调
* 4.消息接收异常回调
*/
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
先启动消费者程序在启动生产者
3.3 一个生产者对应多个消费者工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。 相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进 程将d出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。
说白一点,就像排队入场的时候我们只有一条队伍排队但是,电影院的入口有三或者多个个检票员
新建如下:
take是一个生产者
我们将代码重复的部分封装到一个工具类上方便运用
/**
* 发送线程
*/
public class take {
private static final String QUEUE_ANME="dfp_1";
public static void main(String[] args) throws Exception{
//创建连接
Channel channel= RabbitMqUtils.getChanne1();
//声明队列初始化
/**
* 生成一个队列
* 1.队列名称
* 2.队列里面的消息是非持久化,默认消息存放在内存中
* 3.该队列是否提供一个消费者进行消费,是否决定共享true可以多个消费者消费
* 4.是否自动删除,最后一个消费者断开后,该队列是否自动删除。true自动删除
* 5.其他参数
*/
channel.queueDeclare(QUEUE_ANME,false,false,false,null);
//控制台输入
Scanner scanner=new Scanner(System.in);
while (scanner.hasNext()){
String msg= scanner.next();
/**
* 发送一个消息
* 1.发送到交换机的位置
* 2.路由的key是哪个,
* 3.其他参数
* 4.发送的消息体
*/
channel.basicPublish("",QUEUE_ANME,null,msg.getBytes());
System.out.println("发送完毕:"+msg);
}
}
}
public class Work01 {
private final static String QUEUE_NAME ="dfp_1";
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接
Channel channel= RabbitMqUtils.getChanne1();
//消息的接收 回调
DeliverCallback deliverCallback=(consumerTag,msg)->{
System.out.println("接收到的消息:"+new String(msg.getBody()));
};
//接收的消息被取消执行以下内容
CancelCallback cancelCallback =(consumerTag)->{
System.out.println(consumerTag+"消息发送者取消接口回调");
};
/**
* 消费者消息
* 1.消费那个队列
* 2.消费成功后是否自动应答,true代表是
* 3.消费者未成功消费的回调
* 4.消费者取消消费的回调
*/
System.out.println("c1等待接收消息...");
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
public class Work02 {
private final static String QUEUE_NAME ="dfp_1";
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接
Channel channel= RabbitMqUtils.getChanne1();
//消息的接收
DeliverCallback deliverCallback=(consumerTag,msg)->{
System.out.println("接收到的消息:"+new String(msg.getBody()));
};
//接收的消息被取消执行以下内容
CancelCallback cancelCallback =(consumerTag)->{
System.out.println(consumerTag+"消息发送者取消接口回调");
};
/**
* 消费者消息
* 1.消费那个队列
* 2.消费成功后是否自动应答,true代表是
* 3.消费者未成功消费的回调
* 4.消费者取消消费的回调
*/
System.out.println("c2等待接收消息...");
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
结果
细心的朋友会发现有的消息丢失了,我们一会来讲怎么解决。即使有数据的丢失但是队列和消费者的消费顺序是按照顺序轮训获取生产者发送的消息。
3.4 消息应答 (解决上述问题)概念:
消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成 了部分突然它挂掉了,会发生什么情况。RabbitMQ 一旦向消费者传递了一条消息,便立即将该消 息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续 发送给该消费这的消息,因为它无法接收到。 为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制,消息应答就是:消费者在接 收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了。
自动应答
消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权 衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢 失了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制, 当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终 使得内存耗尽,最终这些消费者线程被 *** 作系统杀死,所以这种模式仅适用在消费者可以高效并 以某种速率能够处理这些消息的情况下使用。
消息应答的方法
A.Channel.basicAck(用于肯定确认)
RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了
B.Channel.basicNack(用于否定确认)
C.Channel.basicReject(用于否定确认)
与 Channel.basicNack 相比少一个参数 不处理该消息了直接拒绝,可以将其丢弃了
3.5 Multiple 的解释
手动应答的好处是可以批量应答并且减少网络拥堵multiple 的 true 和 false 代表不同意思 true 代表批量应答 channel 上未应答的消息 比如说 channel 上有传送 tag 的消息 5,6,7,8 当前 tag 是 8 那么此时 5-8 的这些还未应答的消息都会被确认收到消息应答 false 同上面相比 只会应答 tag=8 的消息 5,6,7 这三个消息依然不会被确认收到消息应答
图片解释
3.5消息自动重新入队
如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息 未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者 可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确 保不会丢失任何消息
代码实现
新建如下
生产者代码几乎没差别
public class take {
private static final String TACK_QUEUE_ANME="ack-queue";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channe1 = RabbitMqUtils.getChanne1();
//声明队列
/** 生成一个队列
* 1.队列名称
* 2.队列里面的消息是非持久化,默认消息存放在内存中
* 3.该队列是否提供一个消费者进行消费,是否决定共享true可以多个消费者消费
* 4.是否自动删除,最后一个消费者断开后,该队列是否自动删除。true自动删除
* 5.其他参数
*/
channe1.queueDeclare(TACK_QUEUE_ANME,false,false,false,null);
//获取控制台的输入消息
Scanner scanner=new Scanner(System.in);
while (scanner.hasNext()){
String msg= scanner.next();
/**
* 默认交换机
* 队列名称
* 其他参数
* 获取消息的二进制数
*/
channe1.basicPublish("",TACK_QUEUE_ANME,null,msg.getBytes("UTF-8"));//防止中文编码错误
System.out.println("产生消息"+new Date()+":" +msg);
}
}
}
设置手动应答
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
并且关闭自动应答
/**
* 消息接受者
* 消息在手动应答的时候是不会丢失,对异常情况会把消息放回队列中 重新等待
*/
public class Work03 {
private static final String TACK_QUEUE_ANME="ack-queue";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChanne1();
System.out.println("C1 等待接收消息处理时间较短");
//接收到消息的回调
DeliverCallback deliverCallback=(consumerTag,message)->{
SleepUtils.sleep(1);
String msg = new String(message.getBody(),"UTF-8");
System.out.println("c1接收的消息:"+new Date() +":"+msg);
//手动应答
/**
* 消息的标记tag
* 消息否定批量应答 不批量处理应答信道中的消息
*/
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};
// 未接收到消息的回调
CancelCallback cancelCallback=(consumerTag)->{
System.out.println("消费者取消消费接收接口回调");
};
boolean autoAck = false;
channel.basicConsume(TACK_QUEUE_ANME,autoAck,deliverCallback,cancelCallback);
}
}
/**
* 消息接受者
* 消息在手动应答的时候是不会丢失,对异常情况会把消息放回队列中 重新等待
*/
public class Work04 {
private static final String TACK_QUEUE_ANME="ack-queue";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChanne1();
System.out.println("C2 等待接收消息处理时间较长长长");
//接收到消息的回调
DeliverCallback deliverCallback=(consumerTag, message)->{
SleepUtils.sleep(30);
String msg = new String(message.getBody(),"UTF-8");
System.out.println("c2接收的消息"+new Date() +":"+msg);
//手动应答
/**
* 消息的标记tag
* 消息否定批量应答 不批量处理应答信道中的消息
*/
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};
// 未接收到消息的回调
CancelCallback cancelCallback=(consumerTag)->{
System.out.println("消费者取消消费接收接口回调");
};
boolean autoAck = false;
channel.basicConsume(TACK_QUEUE_ANME,autoAck,deliverCallback,cancelCallback);
}
}
work03和04 的对比差距在 接受到生产者的信息之后处理的时间不同
SleepUtils.sleep(30)让work04等待30执行
并且在等待的图中关闭work04如果按照轮训的方式 88信号原本应该在work04打印,当我们结束了work04 ,88信号回到队列让work03读取
3.6队列持久化应用场景:我们知道如何处理任务不丢失的情况,但是如何保障当 RabbitMQ 服务停掉以后消
息生产者发送过来的消息不丢失。默认情况下 RabbitMQ 退出或由于某种原因崩溃时,它忽视队列 和消息,除非告知它不要这样做。确保消息不会丢失需要做两件事: 我们需要将队列和消息都标 记为持久化 。 承接上述代码注意!!
因为在rabbitmq的服务器中如果我们已经注册的一条队列,我们需要修改他的队列状态,要删除原先的队列重新创建一条队列不然会报错!
删除队列
点击delete
重新启动take的main方法
我们发现ack-queue标记D表示队列持久化
这样就保证了队列的持久化,使得消息在队列中不会消失有了一定的保证。
3.7消息持久化既然有队列的持久化顾名思义就有消息的持久化来保证消息传输的安全不丢失,将消息标记为持久化并不能完全保证不会丢失消息。尽管它告诉 RabbitMQ 将消息保存到磁盘,但是 这里依然存在当消息刚准备存储在磁盘的时候 但是还没有存储完,消息还在缓存的一个间隔点。此时并没
有真正写入磁盘。 要想让消息实现持久化需要在消息生产者修改代码在发,创建发送消息处修改如下 MessageProperties.PERSISTENT_TEXT_PLAIN 添 加这个属性。 4.消息分发方式 4.1不公平分发在上述的几个案例中,我们使用到的分发方式是轮训分发,意识就是在消费者获取消息的时候,是一个消费者处理消息是一个接着一个按顺序交替执行的,即使你处理的慢也是没有影响的。但是这样遇到上述的work03和work04。对于work04处理较慢的速度来说,就会使得整体的处理消息的速度是减慢的了。不公平分发的原理就是速度快的消费者会主动去帮助速度慢的消费者处理信息。
通俗易懂的原理就是,能者多劳。
因为这是处理接受信息的方式所以我们在消费者的代码上进行修改
我们修改信道的。basicQos()属性不给值默认是0,我们设为1表示不公平分发
结果
我们发现第一次发生的aa还没有处理完毕,所有速度快的c1程序处理了后续的所有信息。
4.2预取值分发按照上述的分发原则不公平分发,显然觉得太不公平。对于正常的观点来说,2个机器一个慢一个快,慢的在做一个拿快的就要全部做完了。
因此我们引入取值分发,就是我们设定消费者要处理几条数据,在消费消息堆积的时候。
比如 我让work03消费2个work04消费5个
但是这个消费的数量也不是绝对定义的而是一种动态消费能力只有在消费能力都达到上限才有明显的效果。
修改代码
其实和不公平分发的设置时一样的,当2个工作的取值相同的时候就会根据能力不同安排处理任务。
其实这个道理我理解的来看就是,从总队列中取值到规定的消费者程序中,但是由于消费者处理快慢不同,每个程序先拿了x个任务这x个任务在处理之前再进行一次排队等待。
就好像去医院挂号,在大厅排队挂号了,好了之后去医生科室排队。
5.发布确认的策略原理:
生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式, 所有在该信道上面发布的 消息都将会被指派一个唯一的 ID (从 1 开始),一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队 列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传 给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置 basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。 confirm 模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信 道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调 方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消 息,生产者应用程序同样可以在回调方法中处理该 nack 消息。 发布确认的策略有三种单个确认发布 、 批量确认发布和异步确认发布。单个确认发布 :发布一条消息确认一次,可以了解每个发布的情况但是很慢。
批量确认发布:一个发布数量确认一次,较快但是不知道发布的详细情况
异步确认发布:生产者可以不听的发布交由rabbitmq的监听器去监控,如果有消息rabbitmq会返回消息给你。
代码实现:
/**
* 消息发布确认模式
* 使用时间比较
* 1.单个确认
* 2.批量确认
* 3.异步确认
*
*/
public class MessageToConfirm {
public static final int MESSAGE_COUNT=1000;
public static void main(String[] args) throws Exception {
//* * 1.单个确认
// MessageToConfirm.oneMethod();//单个确认消耗时间846ms
//* * 2.批量确认
// MessageToConfirm.moreMethod();//批量确认消耗时间89ms
//* * 3.异步确认
MessageToConfirm.differMethod();//异步确认消耗时间50ms
}
//* * 1.单个确认
public static void oneMethod() throws Exception {
//创建信道
Channel channel = RabbitMqUtils.getChanne1();
String queue_name= UUID.randomUUID().toString();
//初始化队列
channel.queueDeclare(queue_name,true,false,false,null);
//开启发布确认
channel.confirmSelect();
long strat=System.currentTimeMillis();
//循环发送消息模拟
for (int i=0;i消息发布成功");
}
}
long end=System.currentTimeMillis();
System.out.println("单个确认消耗时间"+(end-strat)+"ms");
}
//* * 2.批量确认
public static void moreMethod() throws Exception {
//创建信道
Channel channel = RabbitMqUtils.getChanne1();
String queue_name= UUID.randomUUID().toString();
//初始化队列
channel.queueDeclare(queue_name,true,false,false,null);
//开启发布确认
channel.confirmSelect();
//设置批量尺寸,每100条确认一次
int size=100;
long strat=System.currentTimeMillis();
//循环发送消息模拟
for (int i=0;i消息发布成功");
}
}
}
long end=System.currentTimeMillis();
System.out.println("批量确认消耗时间"+(end-strat)+"ms");
}
//* * 3.异步确认
public static void differMethod() throws Exception {
//创建信道
Channel channel = RabbitMqUtils.getChanne1();
String queue_name= UUID.randomUUID().toString();
//初始化队列
channel.queueDeclare(queue_name,false,false,false,null);
//开启发布确认
channel.confirmSelect();
/**
* 线程安全有序的一个哈希表,适用于高并发的状态下
* 1.关联序号和消息 key-value
* 2.删除指定的内容
* 3.支持高并发(多线程)
*/
ConcurrentSkipListMap out=new ConcurrentSkipListMap<>();
//消息发送成功的回调函数
/**
* 1.消息的表示
* 2.批量确认
*/
ConfirmCallback ack=( tag, multiple)->{
//2.删除已经确认的消息,剩下的就是未确认的消息
if (multiple){
//如果是批量获取,批量删除
ConcurrentNavigableMap confirmed = out.headMap(tag,true);
confirmed.clear();
}else {
//不是批量,移除当前标记的信息
out.remove(tag);
}
System.out.println("已确认的消息"+tag);
};
//消息发送失败的回调函数
ConfirmCallback nac=(tag,multiple)->{
String message = out.get(tag);
//打印一下未确认的消息是那些
System.out.println("未确认消息的内容"+message+"---->未确认的消息的标记"+tag);
};
//准备消息监听器
/**
* 1.成功却的消息回调
* 2.失败消息的回调
*/
channel.addConfirmListener(ack,nac);
long strat=System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message="-->异步"+i;
channel.basicPublish("",queue_name,null,message.getBytes());
//不用设置发布确认 boolean flag=channel.waitForConfirms();
//1.记录所有要发送的消息总和
/**
* channel.getNextPublishSeqNo()获取下一个消息的序列号
* 设置信道的hash
* 1.获取信道的编号=key
* 2.消息内容=value
*/
out.put(channel.getNextPublishSeqNo(),message);
}
long end=System.currentTimeMillis();
System.out.println("异步确认消耗时间"+(end-strat)+"ms");
}
}
资料来源于尚硅谷,尚硅谷学习笔记更新ing.....
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)