RabbitMQ

RabbitMQ,第1张

RabbitMQ MQ相关概念: 什么是MQ:

        MQ(message queue),从字面意思来看,本质是个队列,FIFO先入后出,只不过对俄中存放的内容是message而已,还是一种跨进程的通信机制,用于上下游传递信息。在互联网架构中,MQ是一种非常常见的上下游“逻辑解耦+物理解耦”的消息通信服务。使用MQ之后,消息发送上游只需要依赖MQ,不用依赖其他服务。

为什么要用MQ 1、流量消峰

        举个例子,如果订单系统最多能处理一万次订单,这个处理能力应付正常时段的下单时绰绰有余,正常时段的下单时绰绰有余,正常时段我们下单一秒就能返回结果。但是在高峰期,如果有两万次下单 *** 作系统是处理不了的,只能限制订单超过一万后不允许用户下单。使用消息队列做缓冲,我们可以取消这个限制,把下一秒内下的订单分散成一段时间来处理,这有些用户可能在下单几十秒后才能收到下单成功的 *** 作,但是比不能下单的体验要好。

2、应用解耦

        以电商应用为例,应用中有订单系统、库存系统、物流系统、支付系统。当用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障,都会造成下单 *** 作异常。当转变成基于消息队列的方式后,系统间调用的问题会减少很多,比如物流系统因为发生故障,需要几分钟来修复。在这几分钟的时间里,物流系统要处理的内存被缓存在消息队列中,用户下单可以正常完成。当物流系统恢复后,继续处理订单信息即可,用户感受不到物流系统的故障,提升系统的可用性。

3、异步处理

        有些服务间调用是异步的,例如A调用B,B要花费很长时间执行,但是A需要知道B什么时候可以执行完,以前一般有两种方式,A过一段时间去调用B的查询api查询。或者A提供一个callback api,B执行完之后调用api通知A服务。这两种方式都不是很优雅,使用消息总线,可以很方便解决这个问题,A调用B服务后,只需要监听B处理完成的消息,当B处理完成后,会发送一条消息给MQ,MQ会将此消息转发给A服务。这样A服务既不用循环调用B的查询api,也不用提供callback api。同样B服务也不用做这些 *** 作。A服务还能及时得到异步处理成功的消息。

MQ的分类:

1、ActiveMQ

2、Kafka

3、RocketMQ

4、RabbitMQ

MQ的选择:

1、Kafka

        Kafka主要是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输,适合产生大量数据的互联网服务的数据收集业务。大型公司建议可以选用,如果有日志采集功能,肯定首选Kafka。

2、RocketMQ

        天生为金融互联网领域而圣,对于可靠性要求很高的场景,尤其是电商里面的订单扣款,以及业务削峰,在大量交易涌入时,后端可能无法及时处理的情况。RocketMQ在稳定性上可能更值得信赖,这些业务场景在双十一经历了多次考验2,如果业务上有上述并发场景,建议可以选择RocketMQ。

3、RabbitMQ

        结合erlang语言本身的并发有事,性能好时效性微妙级,社区活跃度也比较高,管理界面用起来十分方便,如果数据量没有那么大,中小型公司优先选择功能比较晚辈的RabbitMQ。

RabbitMQ: RabbitMQ的概念:

        RabbitMQ是一个消息中间件:它接收并转发消息。可以把它当作一个快递赚点,当要发送一个包裹时,把包裹放到快递站,快递员最终会把快递送到收件人哪里,按照这种逻辑RabbitMQ是一个快递站,一个快递员帮忙传递快递。RabbitMQ与快递站的主要区别在于,它不处理快递而是接收,存储和转发消息数据。

四大核心概念:

生产者

        产生数据发送消息的程序是生产者

交换机:

        交换机是RabbitMQ非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推

各个名词介绍:

        Broker:接收和分发消息的应用,RabbitMQ Server就是Message Broker

        Virtual host:出于多租户的安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange/queue等

        Connection:publisher/consumer和broker之间的TCP连接

        Channel:如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销将是巨大的,效率较低。Channel是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQP method包含了channel id帮助客户端和message broker识别channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了 *** 作系统建立TCP connection的开销。

        Exchange:message到达broker的第一炸,根据分发规则,匹配查询表中的routing key,分发消息到queue中去。常用的类型有:direct(point-to-point),topic(publish-subscribe) and fanout(multicast)

        Queue:消息最终被送到这里等待consumer取走

        Binding:exchange和queue之间的虚拟连接, binding中可以包含routing key,Binding信息被保存到exchange中的查询表中,用于message的分发依据

RabbitMQ安装:

·添加一个新的用户

        创建账号:rabbitmqqctl add_user admin 123

        设置用户角色:rabbitmqctl set_user_tags admin administrator

        设置用户权限:set_permissions [-p ]

                                  rabbitmqctl set_permission -p "/" admin ".*" ".*" ".*"

                                  用户user_admin具有/vhost1这个virtual host中所有资源的配置、写、读权限

        当前用户和角色:rabbitmqctl list_users

简单队列举例:

使用Java编写两个程序,发送单个消息的生产者和接收消息并打印的消费者。在下图中,“p”是生产者,“c”是消费者。中间的框是一个队列RabbitMQ代表使用者保留的消息缓冲区。

依赖:


    
    
        org.apache.maven.plugins
        maven-compiler-plugin
        
            8
            8
        
    
    



    
    
        com.rabbitmq
        5.8.0
    
    
    
        commons-io
        commons-io
        2.6
    

生产者代码:

 生产者发送消息的

public class Producer{
    //队列名称
    public static final String QUEUE_NAME="hello";

    //发消息
    public static void main(String[] args){
        //创建一个连接工厂
        ConnectionFactory factory=new ConnectionFactory();
        //工厂IP 连接RabbitMQ的队列
        factory.setHost("162.168.200.129");
        //用户名
        factory.setUsername("123");
        //创建连接
        Connection connection=factory.newConnection();
        
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            String message="hello world";//初次使用
        
        
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
        System.out.println("消息发送完毕");
    }
}

消费者代码:

消费者接收消息的

public class Consumer{
    //队列的名称
    public static final String QUEUE_NAME="hello";
    //接收消息
    public static void main(String[] args){
        //创建连接工厂
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("192.168.200.129");
        factory.setUsername("admin");
        factory.setPassword("123");
        Connection connection=factory.newConnection();

        Channel channel=connection.createChannel();

        //声明 接收消息
        DeliverCallback deliverCallback=(consumerTag,message) ->{
            System.out.println(new String(message.getBody()));
        };

        //取消消息时的回调
        CancelCallback cancelCallback=consumerTag ->{
            System.out.println("消息消费被中断");
        };

        
        channel.basiccConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5679079.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存