1.引言 线程池的设置线程最好根据cpu核心数,最大就cpu核心数的两倍 什么是RabbitMQ?RabbitMQ是基于amqp协议,实现的一种MQ理念的服务。类似的服务 RocketMQ、ActiveMQ、Kafka等
为什么在分布式项目中需要一款消息中间件?消息中间件能够实现一些Feign(同步调用)无法实现的效果:
1、服务的异步调用
2、消息的广播(事件总线)
3、消息的延迟处理
4、分布式事务
5、请求削峰(处理高并发)
二、RabbitMQ的Docker安装1)拉取镜像
docker pull rabbitmq:3.8.5-management2)准备docker-compose模板
rabbitmq: image: rabbitmq:3.8.5-management container_name: rabbitmq ports: - 5672:5672 - 15672:15672 restart: always3)启动rabbitmq容器
docker-compose up -d rabbitmq4)访问rabbitmq的管理页面(账号:guest 密码:guest)
三、RabbitMQ常用模型1)模型一
P -> Provider(提供者)
红色方块 -> 队列(存储消息)
C -> Consumer(消费者)
2)模型二
一个提供者对应多个消费者,消息会轮训发送给两个消费者
起到一个消费端负载均衡的目的,减轻消费端的消费压力
3)模型三
发布/订阅模式 - 消息广播
多个消费者会同时收到提供者发布的消息
X -> Exchange(交换机,消息的复制转发,不能存储消息)
4)模型四
路由键 -> 交换机和队列绑定若干路由键,发布的消息可以指定路由键发送
5)模型五
通配符的路由键
6)模型六
Rabbitmq的同步调用模型
四、JavaAPI调用RabbitMQ添加相关依赖
com.rabbitmq amqp-client5.9.0 封装到工具类中
public class ConnectionUtils { //创建连接 public static ConnectionFactory connectionFactory = new ConnectionFactory(); static { connectionFactory.setHost("IP地址"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); //默认的虚拟主机 connectionFactory.setVirtualHost("/"); } public static Connection getConnection(){ //获取连接 try { Connection connection = connectionFactory.newConnection(); return connection; } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } return null; } }服务的提供者
public class Provider { public static void main(String[] args) throws IOException { //1.获取连接 Connection connection = ConnectionUtils.getConnection(); //2、通过连接获取通道 (后续所有的 *** 作都是基于Channel) Channel channel = connection.createChannel(); //3.创建一个队列 channel.queueDeclare("myqueue",false,false,false,null); //4.给队列发送消息 for (int i = 0;i<10;i++){ String msg = "hello RabbitMQ"; channel.basicPublish(" ","myqueue",null,msg.getBytes("utf-8")); } //5.关闭连接 connection.close(); } }服务的消费者
public class Consumer { public static void main(String[] args) throws IOException { //1.获取连接 Connection connection = ConnectionUtils.getConnection(); //2、通过连接获取通道 Channel channel = connection.createChannel(); //3.监听队列 channel.basicConsume("myqueue",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者接收到的消息:"+new String(body,"utf-8")); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }); } }模型二
一个提供者对应多个消费者,消息会轮训发送给两个消费者
起到一个消费端负载均衡的目的,减轻消费端的消费压力
模拟多个消费者
模型三
发布/订阅模式 - 消息广播
多个消费者会同时收到提供者发布的消息
X -> Exchange(交换机,消息的复制转发,不能存储消息)
public class Provider { public static void main(String[] args) throws IOException { //1.获取连接 Connection connection = ConnectionUtils.getConnection(); //2、通过连接获取通道 (后续所有的 *** 作都是基于Channel) Channel channel = connection.createChannel(); //3.创建交换机 channel.exchangeDeclare("myexchange","fanout"); //4.给队列发送消息 for (int i = 0;i<10;i++){ String msg = "hello RabbitMQ---"+i; channel.basicPublish("myexchange","",null,msg.getBytes("utf-8")); } //5.关闭连接 connection.close(); } }public class Consumer { public static void main(String[] args) throws IOException { //1.获取连接 Connection connection = ConnectionUtils.getConnection(); //2、通过连接获取通道 Channel channel = connection.createChannel(); //3.创建队列 channel.queueDeclare("myqueue",false,false,false,null); //不限制启动顺序 channel.exchangeDeclare("myexchange","fanout"); //4.将交换机与队列进行绑定,参数1是队列,参数2是交换机,参数3是表示绑定的路由键(fanout类型的交换机无效) channel.queueBind("myqueue","myexchange",""); //5.监听队列 channel.basicConsume("myqueue",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者2接收到的消息:"+new String(body,"utf-8")); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }); } }模型四
路由键 -> 交换机和队列绑定若干路由键,发布的消息可以指定路由键发送
五、常用方法的参数Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Maparguments) throws IOException; 参数一:queue 队列的名称
参数二:durable 是否持久化,默认为false,非持久化
参数三:exclusive 是否为排他队列,排他队列只有创建这个队列的连接可以 *** 作,其他连接不能 *** 作该队列
参数四:autoDelete 是否自动删除,默认为false,如果为true,那么当所有监听这个队列的消费端断开监听后,队列会自动删除
参数五:arguments 用来设置一个额外的参数
Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Maparguments) throws IOException; 参数一:exchange 交换机的名称
参数二:type 交换机的类型(fanout|direct|topic|header)
参数三:durable 持久化,默认false
参数四:autoDelete 是否自动删除,默认为false,如果未true,所以绑定到交换机的队列,解绑后,交换机就会自动删除
参数五:internal 是否为内置交换机,默认为false,如果为true表示,当前交换机只能绑定其他交换机,提供者不能直接发消息给该交换机
参数六:arguments 用来设置一个额外的参数
六、TTL - 过期时间
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)