消息(message):是指在应用间传送的数据,消息可以非常简单,比如包含文本文字字符串,也可以更复杂,可能嵌入对象。
消息队列:(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递,消息发布者只管把消息发送到MQ中,而不用管谁来取,消息使用者只管从MQ中取消息而不管是谁发布的,这样发布者和使用者都不用知道对方的存在。(业务是队列所以应该满足,先进先出,后进后出,消息的使用者取消息的时候也是从前到后按顺序,)
1.2为什么要使用消息队列 从上面的描述中可以看出消息队列是一种应用间的异步协作机制,那什么时候需要使用MQ呢?
常见的订单系统为例,用户点击下单按钮之后的业务逻辑可能包括:扣减库存,生成相应的单据,发红包,发短信通知,在业务发展的初期可能放在一起同步执行,随着业务的发展订单量增长,需要提升系统服务的性能,这时可以将一些不需要立即生效的 *** 作分析出来异步执行,比如发红包,发短信通知等,这种场景下就可以用MQ让主流程快速完结,而另外的单独的线程拉取MQ的消息(或者有MQ推送消息),当发现MQ中有红包或发短信之类的消息时,执行相应的业务逻辑。
以上是用于业务解耦的情况,其他常见场景包括最终一致性,广播,错峰流控等等。
(将后续的业务写入队列,消息的使用者读取队列,然后完成后面的业务,总的来说为了提高效率)
1.3 RabbitMQ特点 RabbitMQ 是由Erlang 语言开发的AMQP 的开源实现。
AMQP :Advanced MEssage Queue 高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件,基于此协议的客户端与消息中间件可传递消息,并不受产品,开发语言等条件的限制。
RabbitMQ 最初起源于金融系统,用于分布式系统中存储转发消息,在易用性,扩展性,高可用性等方面表现不俗,具体特点包括:
1.可靠性(Reliability):
RabbitMQ使用一些机制来保证可靠性,,如持久化,传输确认,发布确认
2.灵活的路由(Flexible Routing):
在消息进入队列之前,通过Exchange来路由消息的,对于典型的路由功能,RabbitMQ已经提供了一些内置的Exchange来实现,针对复杂的的路由功能,可以将多个Exchange绑定在一起,也通过插件机制实现自己的Exchange。
3.消息集群(Clustering)
多个RabbitMQ服务器可以组成一个集群,形成一个逻辑Broker
4.高可用
队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。
5.多种协议:
RabbitMQ支持多种消息队列协议,比如STOMP ,MQTT
6.管理界面
RabbitMQ提供了一个易用的用户界面,使得用户可以监控和管理消息Broker的许多方面
7.多语言客户端
RabbitMQ 几乎支持所有常用的语言,比如java NET Ruby等
第二章RabbitMQ的安装 一般来说要安装RabbitMQ之前要安装一个Erlang ,可以去Erlang官网下载,接着去Rab官网下载安装包,解压即可
Erlang官网:https://www.erlang.org/downloads
RabbitMQ: https://www.rabbitmq.com/download.html
2.1 安装之前的准备 2.1.1依赖包安装安装RabbitMQ之前必须要先安装所需要的依赖包可以使用下面的一次性安装命令:
启动服务:
rabbitmq-server start
结束服务
rabbitmqctl stop
第三章RabbitMQ消息发送和接收 3.1 RabbitMQ的消息发送和接收机制 所有的MQ产品从模型抽象上来说都是一样的过程:
消费者(consumer)订阅某个队列,生成者(producer)创建消息,然后发布到队列中(queue)。,最后将消息发送到监听的消费者。
上面的是MQ的基本的抽象模型,但是不同的MQ产品有着不同的机制,RabbitMQ实际基于AMQP协议的一个开源实现,因此RabbitMQ内部也是AMQP的基本概念
RabbitMQ的内部接收如下:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-wUutjKLq-1643252252473)(C:UsersGJXAppDataRoamingTyporatypora-user-imagesimage-20211206234432963.png)]
Publisher:生成者,将消息发送到Exchange中
Broker:消息队列实体(比如启动的RabbitMQ,也就是消息服务器)
一个Broker中可以有多个Virtual Host(文件夹)
一个Virtual Host有若干个Exchange(交换机)
Binding:绑定
交换机绑定队列
consumer:消费者
Connection:网络连接:,比如一个tcp连接
消费者连接一个Connection,一个Connection中有若干个Channel(通道。双向通道,可读可写)
通道会连接到某一个队列,消费者通过通道去获取队列中的消息
3.2 AMQP中的消息路由 AMQP中消息的路由过程和java开发过程和 Java 开发者熟悉的JMS存在一些差别,AMQP中增加了Exchange 和 Binding 的角色。生产者就把消息发布到Exchange上,消息最终到达队列并被消费者接受,而Binding 决定交换器的消息应该发送到那个队列。
Producer Consumer
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-DhkxmxvL-1643252252475)(C:UsersGJXAppDataRoamingTyporatypora-user-imagesimage-20211207110540542.png)]
生产者将消息发布到交换机,由交换机根据消息中的信息决定将消息发布到那个队列上
3.3 Exchange的类型 Exchange分发消息时,根据类型的不同分发的策略有区别,目前共四种类型:direct,fanout,topic,headers,
headers匹配AMQP消息的header而不是路右键,此外headers交换器和direct交换器完全一致,但是性能差很多,目前几乎用不到了,所以直接看另外三种模式。
direct单播
消息中的路由键(routing key)如果和Binding中的binding key一致,交换器就将消息发送到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路右键为dog,则只转发routing key 标记为dog 的消息,不会转发dog.puppy,也不会转发dog.guard等等,他是完全匹配,单播的模式。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-DluvsCVv-1643252252475)(C:UsersGJXAppDataRoamingTyporatypora-user-imagesimage-20211207145901749.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-NpDZLZ7J-1643252252476)(C:UsersGJXAppDataRoamingTyporatypora-user-imagesimage-20211207151323028.png)]
过程描述:生产者将消息(数据:李四 routingKey :345)发送到一个消息服务器(Broker)进入到交换机(Exchange),然后通过绑定规则(Binding)将不同的消息发送给队列(queue),通过绑定规则(bindingkey)绑定队列,通过bindingkey和routingkey的匹配确定
direct交换机会根据消息的routingkey的内容精准匹配将消息发送给与routingkey完全一致的queue中bindingkey(一一对应的)
消费者只需要监听某个队列后,就可以获取队列中的消息。
fanout 每个发到fanout类型交换器的消息都会分到所有绑定的队列上去,fanout交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像网广播,每台子网内的主机都获得了一份复制消息。fanout类型转发消息是最快的。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-yHxKOeQJ-1643252252476)(C:UsersGJXAppDataRoamingTyporatypora-user-imagesimage-20211207153505872.png)]
过程描述:消息生产者将消息发送到交换机,由于没有bindingkey,数据会分发给所有的队列,不同的消费者监听不同的队列都能拿到数据
fanout交换:是一种广播模式,消息是一对多的,这种模式中没有routingkey和bindingkey的概念,Bindings只是简单将消息与交换机进行绑定,如果这个消息进入到交换机中,那么这个消息会被转发到所有与当前交换机进行绑定的所有队列中,但是这种模式就和我们收看电视节目一样,必须先在消费中监听队列,就像我们一定要在节目开始之前先打开电视调节到相应的频道一样,否则如果消息先发送那么消费者将永远错过这条消息,就像我们错过电视节目一样。
这种模式的交换机,适合使用在一些消息数据不是很重要的应用中,用户接受到,或者接受不到都无所谓的场景,例如手机app的消息推送
fanout虽然会丢失消息,但是他的速度是最快的
topictopic交换器通过匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路右键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:符号“#”和符号※。#匹配0个或多个单词,*匹配不多不少一个单词。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-UYvrLGY1-1643252252477)(C:UsersGJXAppDataRoamingTyporatypora-user-imagesimage-20211207155831768.png)]
过程描述:生成者将消息发送到交换机,routingkey和bindingkey匹配现在加了通配符可以发送到几个队列中不再是一对一,消费者与队列绑定获取消息
topic交换机:基本概念和使用与Fanout是相同的但是topic需要指定Bindingkey,消息中也需要携带RoutingKey,但是topic中Bindingkey是可以使用通配符的 星号匹配一个必须有,井号匹配0个或者多个
他也会丢失消息,也应该启动消费者来监听队列
java使用RabbitMQ direct方式生成者(消息的发送方)
public class DiractSend { public static void main(String[] args) throws IOException, TimeoutException { //声明一个连接工厂 ConnectionFactory connectionFactory=new ConnectionFactory(); connectionFactory.setHost("127.0.0.1");//指定ip connectionFactory.setPort(5672);//指定端口 connectionFactory.setUsername("guest");//指定账户 connectionFactory.setPassword("guest");//指定密码 Connection connection=null;//定义连接 Channel channel=null;//定义通道 try { connection=connectionFactory.newConnection();//获取连接 channel=connection.createChannel();//获取通道 channel.queueDeclare("myDirectQueue",true,false,false,null); channel.exchangeDeclare("directExchange","direct",true); channel.queueBind("myDirectQueue","directExchange","directRoutingKey"); String message="direct测试消息"; channel.basicPublish("directExchange","directRoutingKey",null,message.getBytes("utf-8")); System.out.println("消息发送成功!"); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); }finally { if (channel!=null){ channel.close(); } if (connection!=null){ connection.close(); } } } }
消费者(消息的接收方)
public class DirectConsumer { public static void main(String[] args) { ConnectionFactory connectionFactory=new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); Connection connection=null; Channel channel=null; try { connection=connectionFactory.newConnection(); channel=connection.createChannel(); channel.queueDeclare("myDirectQueue",true,false,false,null); channel.exchangeDeclare("directExchange","direct",true); channel.queueBind("myDirectQueue","directExchange","directRoutingKey"); channel.basicConsume("myDirectQueue",true,new DefaultConsumer(channel){ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); String message=new String(body); System.out.println("消费者========"+message); } }); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { } } }fanout方式
因为是fanout方式所有消费者(消息的接收方)要先监听队列,因为是fanout是广播方式所以存在多个消费者
注意:由于使用了Fanout的交换机,因此消息的接收方可能会有多个,因此不建议在消息发送时来创建队列
以及绑定交换机建议在消费者中创建队列并绑定交换机,但是在消息发送时,至少确保交换机存在
消费者1:
public class FanoutConsumer1 { public static void main(String[] args) { ConnectionFactory connectionFactory=new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); Connection connection=null; Channel channel=null; try { connection=connectionFactory.newConnection(); channel=connection.createChannel(); String queuename=channel.queueDeclare().getQueue(); channel.exchangeDeclare("FanoutExchange","fanout",true); //将这个随机的队列绑定到交换机中,由于是fanout类型的交换机,因此不需要指定routingkey绑定 channel.queueBind(queuename,"FanoutExchange",""); channel.basicConsume(queuename,true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); String message=new String(body); System.out.println("01消费者======="+message); } }); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { } } }
消费者2:
public class FanoutConsumer2 { public static void main(String[] args) { ConnectionFactory connectionFactory=new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); Connection connection=null; Channel channel=null; try { connection=connectionFactory.newConnection(); channel=connection.createChannel(); String queuename=channel.queueDeclare().getQueue(); channel.exchangeDeclare("FanoutExchange","fanout",true); //将这个随机的队列绑定到交换机中,由于是fanout类型的交换机,因此不需要指定routingkey绑定 channel.queueBind(queuename,"FanoutExchange",""); channel.basicConsume(queuename,true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); String message=new String(body); System.out.println("02消费者======="+message); } }); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { } } }
生产者(消息的发送方):
public class FanoutSend { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory=new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); Connection connection=null; Channel channel=null; try { connection=connectionFactory.newConnection(); channel=connection.createChannel(); channel.exchangeDeclare("FanotExchange","fanout",true); String message="fanout的测试消息!"; channel.basicPublish("FanoutExchange","",null,message.getBytes("utf-8")); System.out.println("消息发送成功"); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { if (channel!=null){ channel.close(); } if (connection!=null){ connection.close(); } } } }topic方式
和fanout一样因为接收方有多个,所以不在生产者(消息的提供者)声明队列,只声明交换机
生产者(消息的发送方)
public class TopicSend { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory=new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setPassword("guest"); connectionFactory.setUsername("guest"); Channel channel=null; Connection connection=null; try { connection=connectionFactory.newConnection(); channel=connection.createChannel(); channel.exchangeDeclare("topicExchange","topic",true); String message="topic的测试消息!"; channel.basicPublish("topicExchange","aa",null,message.getBytes("utf-8")); System.out.println("消息发送成功"); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { if (channel!=null){ channel.close(); } if (connection!=null){ connection.close(); } } } }
消费者1(消息的接收方):
public class TopicConsumer { public static void main(String[] args) { ConnectionFactory connectionFactory=new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setPassword("guest"); connectionFactory.setUsername("guest"); Channel channel=null; Connection connection=null; try { connection=connectionFactory.newConnection(); channel=connection.createChannel(); channel.queueDeclare("topicQueue1",true,false,false,null); channel.exchangeDeclare("topicExchange","topic",true); channel.queueBind("topicQueue1","topicExchange","aa"); channel.basicConsume("topicQueue1",true,"",new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); String message=new String(body); System.out.println("1fanout消费者aa======="+message); } }); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { } } }
消费者2(消息的接收方):
public class TopicConsumer2 { public static void main(String[] args) { ConnectionFactory connectionFactory=new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setPassword("guest"); connectionFactory.setUsername("guest"); Channel channel=null; Connection connection=null; try { connection=connectionFactory.newConnection(); channel=connection.createChannel(); channel.queueDeclare("topicQueue2",true,false,false,null); channel.exchangeDeclare("topicExchange","topic",true); channel.queueBind("topicQueue2","topicExchange","aa.*"); channel.basicConsume("topicQueue2",true,"",new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); String message=new String(body); System.out.println("21fanout消费者aa.*======="+message); } }); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { } } }
消费者3(消息的接收方):
public class TopicConsumer3 { public static void main(String[] args) { ConnectionFactory connectionFactory=new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setPassword("guest"); connectionFactory.setUsername("guest"); Channel channel=null; Connection connection=null; try { connection=connectionFactory.newConnection(); channel=connection.createChannel(); channel.queueDeclare("topicQueue3",true,false,false,null); channel.exchangeDeclare("topicExchange","topic",true); channel.queueBind("topicQueue3","topicExchange","aa.#"); channel.basicConsume("topicQueue3",true,"",new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); String message=new String(body); System.out.println("3fanout消费者aa.#======="+message); } }); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { } } }topic和fanout使用场景对比
topic类型的交换机也是消息一对多的一种交换机类型,它和fanout都能实现一个消息同时发送到多个队列
fanout更适于使用同一个功能不同的进程来获取数据,例如手机App中,的消息推送,一个App可能会含有多个用户来进行
安装然后他们都会启动一个随机的队列来接收着自己的数据。
topic更适合不同的功能模块来接收同一个消息,例如商城下单成功以后需要发送消息到队列中,例如Routingkey为我们的order.success,物流系统监听订单order.wuliu发票系统监听order.fapiao
topic可以使用一个随机的队列名,也可以使用一个明确的队列名,但是如果应用在和订单有关的功能中,建议使用明确的队列名称并且
要求为持久化的队列
事务消息事务消息与数据库的事务类似,只是MQ中的消息是要保证消息是否会全部发送成功,防止丢失的一种策略
RabbitMQ有两种方式来解决这个问题
1.通过AMQP提供的事务机制实现;
2.使用发送者确认模式实现;
事务的使用事务的实现主要是对信道(Channel)的设置,主要的方法有三个:
1. channel.txSelect()声明启动事务模式; 2. channel.txCommint()提交事务 3. channel.txRollback()回滚事务
生成者(消息发送者):
编写public class Consumer { public static void main(String[] args) { ConnectionFactory connectionFactory=new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); Connection connection=null; Channel channel=null; try { connection=connectionFactory.newConnection(); channel=connection.createChannel(); channel.queueDeclare("transactionQueue",true,false,false,null); channel.exchangeDeclare("directTransactionExchange","direct",true); channel.queueBind("transactionQueue","directTransactionExchange","transactionRoutingKey"); channel.txSelect(); channel.basicConsume("transactionQueue",true,new DefaultConsumer(channel){ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); String message=new String(body); System.out.println("消费者========"+message); } }); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { } } }
消费者(消息的接受者):
public class Consumer { public static void main(String[] args) { ConnectionFactory connectionFactory=new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); Connection connection=null; Channel channel=null; try { connection=connectionFactory.newConnection(); channel=connection.createChannel(); channel.queueDeclare("transactionQueue",true,false,false,null); channel.exchangeDeclare("directTransactionExchange","direct",true); channel.queueBind("transactionQueue","directTransactionExchange","transactionRoutingKey"); channel.txSelect(); channel.basicConsume("transactionQueue",true,new DefaultConsumer(channel){ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); String message=new String(body); System.out.println("消费者========"+message); } }); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { } } }
事务暂时对接收者没有影响
消息的发送类 消息发送者的确认模式/confirm/i发送方确认模式使用和事务类似,也是通过设置Channel进行发送方确认的,最终达到确保所有的消息全部发送成功。
/confirm/i三种实现方式:
普通确认(channel.waitFor/confirm/is)
public class Send { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory=new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); Connection connection=null; Channel channel=null; try { connection=connectionFactory.newConnection(); channel=connection.createChannel(); channel.queueDeclare("/confirm/iQueue",true,false,false,null); channel.exchangeDeclare("direct/confirm/iExchange","direct",true); channel.queueBind("/confirm/iQueue","direct/confirm/iExchange","/confirm/iRoutingKey"); String message="普通发送者确认模式的测试消息"; //启动发送者确认模式 channel./confirm/iSelect(); channel.basicPublish("direct/confirm/iExchange","/confirm/iRoutingKey",null,message.getBytes("utf-8")); //阻塞线程等待服务器返回响应,用于确认是否消息发送成功,如果确认消息发送完成则返回true, // 否则返回false,可以为这个方法指定一个毫秒值,用于确定我们需要等待服务器确认的超时时间, // 如果超出了指定的时间以后会抛出异常,表示服务器出现了问题,需要补发消息或者将消息缓存到redis中, // 稍后利用定时任务补发,无论返回false还是抛出异常,消息都有可能发送成功也有可能没有发送成功 //如果我们要求这个消息一定要发送到队列,例如订单数据,那么我们可以采用消息补发 //所谓的补发就是重新发送,可以使用递归或利用redis加定时任务来完成补发 boolean b = channel.waitFor/confirm/is(); System.out.println("消息发送成功!"+b); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } finally { if (channel!=null){ channel.close(); } if (connection!=null){ connection.close(); } } } }
批量确认
public class Send { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory=new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); Connection connection=null; Channel channel=null; try { connection=connectionFactory.newConnection(); channel=connection.createChannel(); channel.queueDeclare("/confirm/iQueue",true,false,false,null); channel.exchangeDeclare("direct/confirm/iExchange","direct",true); channel.queueBind("/confirm/iQueue","direct/confirm/iExchange","/confirm/iRoutingKey"); String message="普通发送者确认模式的测试消息"; //启动发送者确认模式 channel./confirm/iSelect(); channel.basicPublish("direct/confirm/iExchange","/confirm/iRoutingKey",null,message.getBytes("utf-8")); channel.waitFor/confirm/isOrDie(); System.out.println("消息发送成功!"); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } finally { if (channel!=null){ channel.close(); } if (connection!=null){ connection.close(); } } } }
异步监听确认
public class Send { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory=new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); Connection connection=null; Channel channel=null; try { connection=connectionFactory.newConnection(); channel=connection.createChannel(); channel.queueDeclare("/confirm/iQueue",true,false,false,null); channel.exchangeDeclare("direct/confirm/iExchange","direct",true); channel.queueBind("/confirm/iQueue","direct/confirm/iExchange","/confirm/iRoutingKey"); String message="普通发送者确认模式的测试消息"; //启动发送者确认模式 channel./confirm/iSelect(); channel.add/confirm/iListener(new /confirm/iListener() { //消息确认以后的回调方法 //参数一:被确认的消息的编号,从1开始自动递增,用于标记当前是第几个消息 //参数二:当前消息是否同时确认了多个 //注意:如果参数二为true,则表示本次同时确认了多条消息,小于等于当前参数1(消息编号)的所有消息 //全部被确认,如果是false表示只确认多个当前编号的消息 public void handleAck(long l, boolean b) throws IOException { System.out.println("消息被确认了------消息编号"+l+" 是否确认多条"+b); } //消息没有确认的回调方法 //如果这个方法被执行,表示当前的消息没有被确认,需要进行消息补发 //参数一:没有被确认的消息的编号,从1开始自动递增,用于标记当前是第几个消息 //参数二:当前消息是否同时没有确认的多个消息 //注意:如果参数2 为true 则表示小于当前编号的所有的消息可能没有发送成功需要进行消息补发 //如果参数2为false则表示当前编号的消息没法发送成功需要补发 public void handleNack(long l, boolean b) throws IOException { System.out.println("消息没有被确认=======消息编号"+l+" 是否没有确认多条"); } }); for (int i=0;i<10000;i++){ channel.basicPublish("direct/confirm/iExchange","/confirm/iRoutingKey",null,message.getBytes("utf-8")); } System.out.println("消息发送成功!"); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { if (channel!=null){ channel.close(); } if (connection!=null){ connection.close(); } } } }消费者确认模式 第四章SpringBoot集成RabbitMQ 第五章RabbitMQ集群
普通模式()对于Queue来说,消息实体只存在于其中一个节点A/B两个节点仅有相同的元数据,即队列结构,(交换机的所有元数据在所有节点上是一致的,而队列的完整信息只有在创建他的节点上,各个节点仅有相同的远数据,即队列结构)。当消息进入A节点的Queue中后,consumer从B节点拉取数据时,RabbitMQ会临时在A B间进行消息传输把A中的消息实体取出并经过B发给consumer所有consumer应尽量连接每个节点,从中取出消息即对于同一个逻辑队列要在多个节点建立物理Queue否则无论consumer连A或B出口总在A,会产生瓶颈,该模式存在一个问题就是当A节点故障后,B节点无法读取到A节点中还未消费的信息实体,如果做个消息持久化,那么等A节点恢复,然后才可以被消费,如果没有做持久化,然后就该模式非常适合非持久队列,只有该队列是非持久化,客户端才能重新连接到集群中的其他节点,并且重新创建队列,如果该队列是持久化的,那么唯一的办法就是将故障节点恢复起来
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-8kQfrZDV-1643252252477)(C:UsersGJXAppDataRoamingTyporatypora-user-imagesimage-20211209215555814.png)]
镜像模式(高可用模式)
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-QectEWs9-1643252252478)(C:UsersGJXAppDataRoamingTyporatypora-user-imagesimage-20211209215933222.png)]
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)