异步处理
用户注册后,需要发送邮箱和手机验证码:
传统的需要一个步骤一个步骤的完成
应用解耦
将模块与模块之间的调用相互分离,不在是模块之间的直接调用
流量削峰
场景:秒杀,抢购业务
因为流量过大,导致服务宕机,为解决这个问题,在前端加入消息队列
用户发起秒杀请求,将请求加入消息队列中,超过队列长度就抛弃,抛出秒杀失败页面。
RabbitMQ和Kafka相比:RabbitMQ更可靠 ,相比Kafka性能低。
Kafka性能高(高的离谱,压RabbitMQ一头)但是相对于来说没有RabbitMQ可靠。根据场景选择消息队列。
RabbitMQ各组件功能 RabbitMQ安装与使用想要安装RabbitMQ,必须先安装erlang语言环境,类似安装tomcat,必须先安装JDK
查看RabbitMQ匹配的Erlang版本:https://www.rabbitmq.com/which-erlang.html
RabbitMQ版本 支持的Erlang最低版本 支持的Erlang最高版本
RabbitMQ的安装:RabbitMQ高性能消息队列要安装在Linux上而不是Windows上。
erlang下载:https://dl.bintray.com/rabbitmq-erlang/rpm/erlang
socat下载:http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm
RabbitMQ下载:https://www.rabbitmq.com/install-rpm.html#downloads
下载完成后 发送到Linux上。在Linux上进行安装,下方安装命令按照顺序执行。
[root@localhost opt]# rpm -ivh erlang-21.3.8.16-1.el7.x86_64.rpm [root@localhost opt]# rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm [root@localhost opt]# rpm -ivh rabbitmq-server-3.8.6-1.el7.noarch.rpm
RabbitMQ自带有一个丰富的后台管理页面,需要我们自行启动
[root@localhost opt]# rabbitmq-plugins enable rabbitmq_management
开启RabbitMQ
[root@localhost opt]# systemctl start rabbitmq-server.service
查看RabbitMQ状态
[root@localhost opt]# systemctl status rabbitmq-server.service
重启RabbitMQ
[root@localhost opt]# systemctl restart rabbitmq-server.service
停止运行RabbitMQ
[root@localhost opt]# systemctl stop rabbitmq-server.service
查看RabbitMQ相关进程
[root@localhost opt]# ps -ef | grep rabbitmq
访问RabbitMQ自带的后台页面,因为本地安装的Centos自启动有防火墙,我们要访问Linux需要先关闭防火墙或者开放端口。
[root@localhost opt]# systemctl stop firewalld
访问页面,在网址舒服Linux所在IP地址+端口号15672。
默认账号密码是guest ,guest。默认账号密码是不支持远程访问的,所以我们要新建一个用户
添加账号命令
[root@localhost opt]# rabbitmqctl add_user 自定义用户名 密码
分配超级管理员角色
[root@localhost opt]# rabbitmqctl set_user_tags 用户名 administrator
设置角色权限
[root@localhost opt]# rabbitmqctl set_permissions -p "/" 用户名 ".*" ".*" ".*"
查看角色列表
[root@localhost opt]# rabbitmqctl list_users
修改密码命令
[root@localhost opt]# rabbitmqctl change_password 用户名 要改成的密码
访问成功页面
RabbitMQ带有三个端口
5672:RabbitMQ提供给编程语言客户端链接的端口。
15672:RabbitMQ后台管理界面端口号。
25672:RabibtMQ集群用的端口号。
Java与RabbitMQ连接使用引入依赖
com.rabbitmq amqp-client5.7.3 org.slf4j slf4j-log4j121.7.25 compile org.apache.commons commons-lang33.9
配置Log4j日志文件
log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.Target=System.out log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %m%n log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.File=rebbitmq.log log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %l %m%n log4j.rootLogger=debug, stdout,file
在IDEA中创建Java与RabbitMQ连接的工具类,因为在创建连接的时候需要选择VirtuaHost虚拟主机 刚开始安装的RabbitMQ只有一个虚拟主机 **/**我们可以自定义创建虚拟主机,可以在后台页面中创建
创建ConnectionUtls类获取连接
public static Connection getConnection() throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置连接端口号 factory.setPort(5672); //设置主机地址 factory.setHost("192.168.152.130"); //设置连接RabbitMQ的哪个虚拟主机 factory.setVirtualHost("/lyux"); //设置账号密码 factory.setUsername("lyx"); factory.setPassword("123456"); //创建连接 Connection connection = factory.newConnection(); return connection; }RabbitMQ模式
RabbitMQ一共有6个模式,先学习前五个模式,第六个模式是RPC远程调用。
前五个模式可以分为两大类:点对点模式、发布订阅模式。
点对点模式:PSP(point to point)模式包含三个角色
发布者、消息队列、接收者
队列中保存消息,直到消息被消费或者超时
每个消息发送到消息队列中进行保存,接收者从消息队列中获取消息
特点:
每个消息只有一个消费者,被消费后就不在消息队列了
发送者与接收者没有依赖关系,发送者发送的消息不管接收者接受与否都会放在消息队列中。
接收者接受消息后,需要向发送对象发送应答成功
如果需要保证每条消息都被接受就需要用到P2P模式
发布订阅模式(publish/subscribe)包含三个角色
发布者、交换机、订阅者
多个发布者将消息发送到交换机,交换机将消息分别发送到多个订阅者
特点
每个消息可以有多个订阅者
发布者与订阅者有时间上的依赖,也就是对于某一个订阅者来说,只有订阅了发布者,才能消费发布者的消息
为了消费发布者信息,订阅者需要保持运行状态
如果希望发送的一个消息被多人使用可以采用这个模式。
点对点模式之简单模式创建消息发送方
ppublic class Sender{ public static void main(String[] args){ //利用创建好的ConnectionUtls类获取连接 Connection connection = ConnectionUtils.getConnection(); //创建信道 Channel channel = connection.createChannel(); //参数1 – 队列的名称 //参数2 - 如果我们声明一个持久队列,则为真(该队列将在服务器重启后继续存在) //参数3 – 如果我们声明独占队列(仅限于此连接),则为 true //参数4 – 如果我们声明一个自动删除队列,则为 true(服务器将在不再使用时将其删除) //参数5 - 队列的其他属性(构造参数) //创建队列 channel.queueDeclare("queue1",false,false,false,null); //参数1 - 将消息发布到的交换机 //参数2 – 队列名 //参数3 - 如果要设置“强制”标志,则为 true //参数4 - 消息的其他属性 - 路由标头等 //参数5 - 消息正文 //发送信息 channel.basicPublish("","queue1",null,msg.getBytes(StandardCharsets.UTF_8)); //关闭连接 channel.close(); connection.close(); } }
创建接收方
public class Accept{ public static void main(String[] args){ //持续保持连接,不然无法接受消息 while (true) { //获取链接 Connection connection = ConnectionUtils.getConnection(); //创建信道 Channel channel = connection.createChannel(); //创建获取队列后回调的对象 DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(new String(body)); //开启手动确认消息 channel.basicAck(envelope.getDeliveryTag(), false); } }; //获取队列内容 channel.basicConsume("queue1", false, consumer); } } }
初次创建 一定要先启动 发送方,因为先启动接收方的话 没有找到相应的队列会报错。如果想要先启动接收方的话,可以用下方的方法。
public class Accept{ public static void main(String[] args){ //持续保持连接,不然无法接受消息 while (true) { //获取链接 Connection connection = ConnectionUtils.getConnection(); //创建信道 Channel channel = connection.createChannel(); //如果队列没有被创建,就创建队列,如果已经被创建,就获取队列 channel.queueDeclare("queue1",false,false,false,null); //创建获取队列后回调的对象 DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(new String(body)); //开启手动确认消息 channel.basicAck(envelope.getDeliveryTag(), false); } }; //获取队列内容 channel.basicConsume("queue1", false, consumer); } } }点对点模式之工作队列模式
工作队列模式就是发送方产生消息过快,而接收方接收消息慢,使消息不能及时使用,这时我们就可以多有几个接收方。
1、创建两个接收方代码相同
public class Accept{ public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); //创建信道 Channel channel = connection.createChannel(); channel.queueDeclare("queue1", false, false, false, null); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(new String(body)); try { //延迟时间不同,达到劳动能力不同的效果 //时间小的就是干得快的,时间慢的就是干得慢的。 Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } channel.basicAck(envelope.getDeliveryTag(),false); } }; channel.basicConsume("queue1", false, consumer); } }
先运行两个接收方,在运行发送方。运行后我们发现并没有达到干得快就干得多的效果。
官方原话如下
为了解决这个问题我们可以在接受方加入
int prefetchCount = 1; channel.basicQos(prefetchCount);
成品如下
public class Accept { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); //创建信道 Channel channel = connection.createChannel(); channel.queueDeclare("queue1", false, false, false, null); //告诉RabbitMQ 不要向接收方一次发送多条消息,只有确认一条后才发送另外一条。 channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(new String(body)); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } channel.basicAck(envelope.getDeliveryTag(),false); } }; channel.basicConsume("queue1", false, consumer); } }
此时我们就达到了能者多干的效果。
面试题:怎么避免消息堆积?
可以采用多个工作方监控发送方的消息 通过线程池 ,异步消费。 发布订阅模式
简单来说 就相当于抖音快手粉丝订阅主播,主播发送视频时,每一个粉丝都能收到,而不是只有一个人能收到消息。
相对于点对点模式来说 多了一个交换机(路由)。
创建发送方。
public class Sender { public static void main(String[] args) throws IOException, TimeoutException { String msg= "你好,世界"; //获取连接 Connection connection = ConnectionUtils.getConnection(); //创建信道 Channel channel = connection.createChannel(); //创建路由(路由名字,路由类型) //路由类型可以在后台管理页面中查看 //fanout channel.exchangeDeclare("yangmi","fanout"); //发送信息 channel.basicPublish("yangmi","",null,msg.getBytes()); channel.close(); connection.close(); } }
创建两个接收方,代码基本相同,队列名字不同
public class Accept { public static void main(String[] args) throws IOException, TimeoutException { //获取链接 Connection connection = ConnectionUtils.getConnection(); //创建信道 Channel channel = connection.createChannel(); //创建队列 channel.queueDeclare("fensi2",false,false,false,null); //绑定交换机 channel.queueBind("fensi2","yangmi",""); //创建接收消息后的回调类 DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(new String(body)); channel.basicAck(envelope.getDeliveryTag(),false); } }; //创建消费者 channel.basicConsume("fensi2",false,consumer); } }
发送方发送消息,接收方接受消息,我们发现两个接收方能接受的是同一个消息。
同一个消息能被多个消费者消费。这就是发布订阅模式
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)