- RabbitMQ
- 听课笔记
- 一、优点:
- 二、市面上常用的MQ:
- 三、访问web管理工具(本地与远程):
- 四、消息应答(确认收到消息或消费完消息的回复)
- 1.自动应答
- 2.手动应答(最佳选择)
- 3.消息重新入列
- 五、消息的分发机制:
- 1.轮训分发(公平分发):
- 2.不公平分发(建议使用):
- 3.预取值分发(建议使用):
- 六、生产者的发布确认:
- 1.单次发布确认:
- 2.批量发布确认:
- 3.异步发布确认:
- 4.高并发集合工具类**ConcurrentSkipListMap**和**ConcurrentHashMap**
- 七、交换机:
- 1.概念与作用:
- 2.fanout(扇形广播发布)
- 3.direct exchange(直接交换)
- 4.Topics exchange(主题交换机,功能最强大)
- (1)Topic 的要求(routingKey命名规则)
请求不直接传到后台,而是先经过rabbitMq队列,通过队列传输再传到服务器,因此给了rabbitMQ *** 作的空间
(1)可以有削峰的作用:当某段时间请求突然变多,超出了服务器的承载范围,但因为有rabbitmq限制传给服务器请求的数量,多余的存在消息队列中排队,可以达到削峰的目的。
(2)应用解耦:就如同下单系统,里面会调用多个其它系统的接口,如其中一个系统报错,则整个环节就出问题,当如果有rabbitmq,订单系统直接把订单信息放在队列中,队列返回一个成功信息,这时订单系统的任务已经完成了,后面有没有报错都和他没关系。至于后面的执行,就是队列不断监督后续环节的进行。
(3)异步处理:当一个接口方法执行会很久,但主程序想准确知道他的执行时间,拿到执行结果,这可以通过消息队列实现:
二、市面上常用的MQ:(1)activeMQ:现在他们的团队不多维护。
(2)kafka:吞吐量大,但有缺点,就是不对消息消费错误处理(消息处理报错就消失)。
(3)RocketMQ:阿里巴巴出品,用Java语言实现,只支持Java和C++使用。
(4)RabbitMQ:公司常用,最主流的消息中间件之一。
三、访问web管理工具(本地与远程):按照老师的步骤,启动web管理工具后,网页可以用localhost打开,但不能远程打开。
(tips:15672端口是访问web管理工具的,5672是用来给rabbitmq队列传消息的端口)
远程访问不了的解决方法:
1.服务器命令行上关闭防火墙或开放15672端口;
2.如果用的是阿里云服务器,要去实例服务器上配置一下安全组,开放一下端口(这是阿里云的保护机制),顺便开放一下5672的端口,下面连接rabbitMQ会用到:
四、消息应答(确认收到消息或消费完消息的回复)保证传到消费机上的消息不会被丢失
1.自动应答实际的意义是指:消费服务器接受到消息后返回消息消费成功的信息给rabbitMQ,让rabbitMQ对确认消费的消息进行删除,注意这里是接受到数据后直接返回消费成功的信息,如果此时这一个消息消费较长时间中服务器宕机了,rabbitMQ早早此信息删除了(因消费机返回消费成功信息),这就发生了信息丢失问题,而且还会发生消息堆积。
所以,这里的自动应答应配合良好的服务器环境,但事实上你也不能确保你的服务器不会发生问题,所以最好配置手动应答。
在每次消费完信息后调用消息应答,即在deliverCallback方法执行最后调用应答方法。multiple参数是指是否批量应答,同在一个channel的消息共用一次应答(可能会发生消息丢失,不建议使用,可适用于高并发但不重要的场合)
其它的应答方法:
在消费机获取消息后因发生故障(如宕机等),未将ack消息(消费成功消息)发送到rabbitMQ中(其中检测消费机发生故障可能采用了心跳机制,检测故障与接受到成功消息的时间没有关系)。rabbitMq就会将此消息重新入列,交给其它消费机处理。
五、消息的分发机制: 1.轮训分发(公平分发):没设置之前,默认为轮训分发,即prefetchCount默认为0
此现象针对与同一队列中有多个消费者,轮训分发指每个消费者顺序的获取消息,且只有上一个消费者获取消息后消费完返回ack消费信息(消费成功信息)后才会给下一个消费者传递下一个消息。
2.不公平分发(建议使用):在多个效率不同的消费机的情况下,上面的公平分发不能充分利用服务器的资源,所以有不公平分发:谁做的快谁发的多;意思是队列会找空闲的消费机派发消息,那做的快点的消费机自然做的多。
3.预取值分发(建议使用):这是不公平分发的升级版,因为消息的发送本身需要时间(rabbitmq通常装在其它服务器,消息发送通过网络),所以会有延迟时间,所以这里设置了预取值,发送多条消息缓存在消费机上,当消费机有一条消息被消费,队列接收到消费成功的消息后会向消费机发送一条消息,但消费机缓存的消息总数始终不会高于预取值。
注意:因为自己用的服务器自然知道它的性能,程序员应根据现实情况设置预取值,如果预取值设置过大,服务器效率低,就会造成预取值后面的消息长时间没有消费,会造成部分消息消费时间长,所以现实中程序员应根据服务器性能设置预取值。
六、生产者的发布确认:为确认生产者发送的消息是否被RabbitMQ接收到,好让生产者及时做出信息重新发送的准备,因此有发布确认。
1.单次发布确认:即每次发送一条数据,就确认一下RabbitMQ是否接收到,发送时间较长。
public static void publicMessageIndividually(int num) throws Exception { Channel channel = RabbitMQConnect.getChannel(); String queueName= UUID.randomUUID().toString(); channel.queueDeclare(queueName,true,false,false,null); channel./confirm/iSelect(); long begin = System.currentTimeMillis(); for (int i=0;i2.批量发布确认: 这里步骤和单次确认发布是差不多的,不同的点是我们要靠代码控制确认发布的频率。但不能保证监控所有信息是否被接收
public static void publicMessageBench(int num,int target) throws Exception { Channel channel = RabbitMQConnect.getChannel(); //开启发布确认模式 channel./confirm/iSelect(); String queueName=UUID.randomUUID().toString(); channel.queueDeclare(queueName,false,false,true,null); long begin = System.currentTimeMillis(); for (int i=0;i3.异步发布确认: 其实这里实质就是一个线程只管发消息,开另一个线程监听消息成功(失败)发送的消息从而做出相应的行为,这样发信息就快很多,因为它不需要等待结果,另一个线程只管慢慢监测消息发送的结果,失败则做出相应补救。
public static void publicMessageAsync(int num) throws Exception { Channel channel = RabbitMQConnect.getChannel(); String queueName=UUID.randomUUID().toString(); channel.queueDeclare(queueName,false,false,false,null); channel./confirm/iSelect(); ConcurrentSkipListMap map=new ConcurrentSkipListMap(); /confirm/iListener callBack = new /confirm/iListener() { @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { if (multiple){ //将序号小于或等于当前消息的序号的消息都清空 ConcurrentNavigableMap confirm = map.headMap(deliveryTag,true); /confirm/i.clear(); }else { //将当前的消息删除掉 map.remove(deliveryTag); System.out.println("确认收到信息"+deliveryTag); } } //没有接收到确认消息的回调 @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("信息+"+deliveryTag+"未被确认"); } }; channel.add/confirm/iListener(callBack); long begin = System.currentTimeMillis(); for (int i=0;i 4.高并发集合工具类ConcurrentSkipListMap和ConcurrentHashMap 上面用了高并发工具集合类ConcurrentSkipListMap,线程安全有序的一个哈希表,适用于高并发的情况,内部数据结构用了跳表。其实ConcurrentHashMap这里也适用,同样是线程安全的,同样数据结构适合高并发的场合(高数据吞吐量),但ConCurrentSkipListMap在更多线程同时访问时更有优势,线程同时访问的数量几乎ConCurrentSkipListMap没有影响。
七、交换机: 1.概念与作用:交换机起到选择对某队列添加消息的作用:
RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列。实际上,通常生产
者甚至都不知道这些消息传递传递到了哪些队列中。
相反,生产者只能将消息发送到交换机**(exchange)**,交换机工作的内容非常简单,一方面它接收来
自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消
息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。
注意:一个队列中的一条消息仅能被消费一次,这是永恒不变的原则
2.fanout(扇形广播发布)此类交换机收到消息后会将此消息添加到它绑定的所有队列。它所有队列的routeKey为空。
下面是代码例子:
1.生产者:
public class Product { private static final String EXCHANGE_NAME="logs"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQConnect.getChannel(); //创建了fanout类型exchange channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); //开启发布确认 channel./confirm/iSelect(); //异步发布确认监听器 /confirm/iListener confirmListener =new /confirm/iListener() { @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println("消息"+deliveryTag+"已成功接收"); } @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("消息"+deliveryTag+"未被成功接收"); } }; channel.add/confirm/iListener(/confirm/iListener); Scanner scanner =new Scanner(System.in); while (scanner.hasNextLine()){ String s = scanner.nextLine(); channel.basicPublish(EXCHANGE_NAME,"",null,s.getBytes()); } } }2.两个消费者代码一样:
public class Consumer1 { private static final String EXCHANGE_NAME="logs"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQConnect.getChannel(); //建立fanout exchange channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); String queue = channel.queueDeclare().getQueue(); //fanout中的routingKey都为"",所以都可以收到 channel.queueBind(queue,EXCHANGE_NAME,""); System.out.println("Consumer1开始接收消息_____________"); DeliverCallback deliverCallback=(consumerTag,message) ->{ String string=new String(message.getBody()); System.out.println("成功接收到消息"+string); channel.basicAck(message.getEnvelope().getDeliveryTag(),false); }; CancelCallback cancelCallback =(consumerTag)->{ System.out.println("消息("+consumerTag+")取消了"); }; channel.basicConsume(queue,false,deliverCallback,cancelCallback); } }3.direct exchange(直接交换)这里是交换机根据routingKey寻找队列添加消息,与fanout的广播式不同,它是有目的根据routingkey去找的。
注意:
1.一个队列可以有多个routingKey,即可以被多个交换机绑定
2.一个交换机可以用一个routingkey绑定多个队列,这里有点像fanout。
代码例子:
1.生产者:
public class Product { private static final String EXCHANGE_NAME="direct"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQConnect.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //开启确认发布 channel./confirm/iSelect(); //定义监听器 /confirm/iListener confirmListener =new /confirm/iListener() { @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println("消息"+deliveryTag+"已成功接收"); } @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("消息"+deliveryTag+"未被成功接收"); } }; //添加异步发布确认监听器 channel.add/confirm/iListener(/confirm/iListener); Scanner scanner =new Scanner(System.in); while (scanner.hasNextLine()){ String s = scanner.nextLine(); //给routingKey为”queue1“的队列添加信息 channel.basicPublish(EXCHANGE_NAME,"queue1",null,s.getBytes()); } } }2.消费者代码和上一节的消费者大致相同,除了:
//指定使用临时队列 String queue = channel.queueDeclare().getQueue(); //创建direct类型的exchange channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //将queue指定routingKey绑定到direct交换机上 channel.queueBind(queue,EXCHANGE_NAME,"queue2");4.Topics exchange(主题交换机,功能最强大)它集合了上面两个交换机的功能,给指定routingKey寻找赋予了类似字符串的正则表达式寻找。
(1)Topic 的要求(routingKey命名规则)发送到类型是 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开。这些单词可以是任意单词,比如说:“stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”.这种类型的。当然这个单词列表最多不能超过 255 个字节。 在这个规则列表中,其中有两个替换符是大家需要注意的
*(星号)可以代替一个单词
#(井号)可以替代零个或多个单词
上图是一个队列绑定关系图,我们来看看他们之间数据接收情况是怎么样的
quick.orange.rabbit 被队列 Q1Q2 接收到
lazy.orange.elephant 被队列 Q1Q2 接收到
quick.orange.fox 被队列 Q1 接收到
lazy.brown.fox 被队列 Q2 接收到
lazy.pink.rabbit 虽然满足两个绑定但只被队列 Q2 接收一次
quick.brown.fox 不匹配任何绑定不会被任何队列接收到会被丢弃
quick.orange.male.rabbit 是四个单词不匹配任何绑定会被丢弃
lazy.orange.male.rabbit 是四个单词但匹配 Q2
2.代码例子:
生产者:
channel.basicPublish(EXCHANGE_NAME,"quick.orange.rabbit ", null,message.getBytes("UTF-8"));消费者
//声明 Q1 队列与绑定关系 String queueName="Q1"; channel.queueDeclare(queueName, false, false, false, null); channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");欢迎分享,转载请注明来源:内存溢出
评论列表(0条)