MQ(message queue),从字面意思来看,本质是个队列,FIFO先入后出,只不过对俄中存放的内容是message而已,还是一种跨进程的通信机制,用于上下游传递信息。在互联网架构中,MQ是一种非常常见的上下游“逻辑解耦+物理解耦”的消息通信服务。使用MQ之后,消息发送上游只需要依赖MQ,不用依赖其他服务。
为什么要用MQ 1、流量消峰举个例子,如果订单系统最多能处理一万次订单,这个处理能力应付正常时段的下单时绰绰有余,正常时段的下单时绰绰有余,正常时段我们下单一秒就能返回结果。但是在高峰期,如果有两万次下单 *** 作系统是处理不了的,只能限制订单超过一万后不允许用户下单。使用消息队列做缓冲,我们可以取消这个限制,把下一秒内下的订单分散成一段时间来处理,这有些用户可能在下单几十秒后才能收到下单成功的 *** 作,但是比不能下单的体验要好。
2、应用解耦以电商应用为例,应用中有订单系统、库存系统、物流系统、支付系统。当用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障,都会造成下单 *** 作异常。当转变成基于消息队列的方式后,系统间调用的问题会减少很多,比如物流系统因为发生故障,需要几分钟来修复。在这几分钟的时间里,物流系统要处理的内存被缓存在消息队列中,用户下单可以正常完成。当物流系统恢复后,继续处理订单信息即可,用户感受不到物流系统的故障,提升系统的可用性。
3、异步处理有些服务间调用是异步的,例如A调用B,B要花费很长时间执行,但是A需要知道B什么时候可以执行完,以前一般有两种方式,A过一段时间去调用B的查询api查询。或者A提供一个callback api,B执行完之后调用api通知A服务。这两种方式都不是很优雅,使用消息总线,可以很方便解决这个问题,A调用B服务后,只需要监听B处理完成的消息,当B处理完成后,会发送一条消息给MQ,MQ会将此消息转发给A服务。这样A服务既不用循环调用B的查询api,也不用提供callback api。同样B服务也不用做这些 *** 作。A服务还能及时得到异步处理成功的消息。
MQ的分类:1、ActiveMQ
2、Kafka
3、RocketMQ
4、RabbitMQ
MQ的选择:1、Kafka
Kafka主要是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输,适合产生大量数据的互联网服务的数据收集业务。大型公司建议可以选用,如果有日志采集功能,肯定首选Kafka。
2、RocketMQ
天生为金融互联网领域而圣,对于可靠性要求很高的场景,尤其是电商里面的订单扣款,以及业务削峰,在大量交易涌入时,后端可能无法及时处理的情况。RocketMQ在稳定性上可能更值得信赖,这些业务场景在双十一经历了多次考验2,如果业务上有上述并发场景,建议可以选择RocketMQ。
3、RabbitMQ
结合erlang语言本身的并发有事,性能好时效性微妙级,社区活跃度也比较高,管理界面用起来十分方便,如果数据量没有那么大,中小型公司优先选择功能比较晚辈的RabbitMQ。
RabbitMQ: RabbitMQ的概念:RabbitMQ是一个消息中间件:它接收并转发消息。可以把它当作一个快递赚点,当要发送一个包裹时,把包裹放到快递站,快递员最终会把快递送到收件人哪里,按照这种逻辑RabbitMQ是一个快递站,一个快递员帮忙传递快递。RabbitMQ与快递站的主要区别在于,它不处理快递而是接收,存储和转发消息数据。
四大核心概念:生产者:
产生数据发送消息的程序是生产者。
交换机:
交换机是RabbitMQ非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推
各个名词介绍:Broker:接收和分发消息的应用,RabbitMQ Server就是Message Broker
Virtual host:出于多租户的安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange/queue等
Connection:publisher/consumer和broker之间的TCP连接
Channel:如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销将是巨大的,效率较低。Channel是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQP method包含了channel id帮助客户端和message broker识别channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了 *** 作系统建立TCP connection的开销。
Exchange:message到达broker的第一炸,根据分发规则,匹配查询表中的routing key,分发消息到queue中去。常用的类型有:direct(point-to-point),topic(publish-subscribe) and fanout(multicast)
Queue:消息最终被送到这里等待consumer取走
Binding:exchange和queue之间的虚拟连接, binding中可以包含routing key,Binding信息被保存到exchange中的查询表中,用于message的分发依据
RabbitMQ安装:·添加一个新的用户
创建账号:rabbitmqqctl add_user admin 123
设置用户角色:rabbitmqctl set_user_tags admin administrator
设置用户权限:set_permissions [-p
rabbitmqctl set_permission -p "/" admin ".*" ".*" ".*"
用户user_admin具有/vhost1这个virtual host中所有资源的配置、写、读权限
当前用户和角色:rabbitmqctl list_users
简单队列举例:使用Java编写两个程序,发送单个消息的生产者和接收消息并打印的消费者。在下图中,“p”是生产者,“c”是消费者。中间的框是一个队列RabbitMQ代表使用者保留的消息缓冲区。
依赖:
org.apache.maven.plugins maven-compiler-plugin8 com.rabbitmq 5.8.0 commons-io commons-io2.6
生产者代码:
生产者发送消息的
public class Producer{ //队列名称 public static final String QUEUE_NAME="hello"; //发消息 public static void main(String[] args){ //创建一个连接工厂 ConnectionFactory factory=new ConnectionFactory(); //工厂IP 连接RabbitMQ的队列 factory.setHost("162.168.200.129"); //用户名 factory.setUsername("123"); //创建连接 Connection connection=factory.newConnection(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); String message="hello world";//初次使用 channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); System.out.println("消息发送完毕"); } }
消费者代码:
消费者接收消息的
public class Consumer{ //队列的名称 public static final String QUEUE_NAME="hello"; //接收消息 public static void main(String[] args){ //创建连接工厂 ConnectionFactory factory=new ConnectionFactory(); factory.setHost("192.168.200.129"); factory.setUsername("admin"); factory.setPassword("123"); Connection connection=factory.newConnection(); Channel channel=connection.createChannel(); //声明 接收消息 DeliverCallback deliverCallback=(consumerTag,message) ->{ System.out.println(new String(message.getBody())); }; //取消消息时的回调 CancelCallback cancelCallback=consumerTag ->{ System.out.println("消息消费被中断"); }; channel.basiccConsume(QUEUE_NAME,true,deliverCallback,cancelCallback); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)