RabbitMQ

RabbitMQ,第1张

目录

RabbitMQ简介:

什么是MQ:

应用场景:

MQ不足:

什么时候使用MQ?

MQ主要解决哪些层面问题:

RabbitMQ快速入门:

下载安装:

配置环境变量:

启动管理工具:

RabbitMQ添加环境:

学习五种队列


RabbitMQ简介: 什么是MQ:

定义:消息队列

什么是队列:队列即为一个消息管道,以管道的形式去做消息传递,遵循先入先出的规则

消息队列也支持多线程

什么是消息队列
        MQ全称为Message Queue,即消息队列。“消息队列”是在消息的传输过程中保存消息的容器。它是典型的:生产者、消费者模型。生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦。

市面上常见的消息队列开源组件有哪些?

ActiveMQ:是老牌的消息中间件,国内很多公司过去运用的还是非常广泛,功能很强大。但问题在于没法确认ActiveMQ可以支撑互联网公司的高并发,高负载以及高吞吐的复杂场景,在国内的互联网公司落地的较少。

RabbitMQ:可以支撑互联网公司的高并发,高负载以及高吞吐的复杂场景,同样有非常完善便捷的后台管理界面可以使用。另外他还支持集群化,高可用部署架构,消息高可靠支持,功能较为完善。而且经过调研,国内各大互联网公司落地大规模RabbitMQ集群支撑自身业务的case较多,国内各种中小型互联网公司使用的RabbitMQ的实践也比较多。

RockMQ:是阿里开源,经过阿里的生产环境的超高并发,高吞吐的考验,性能卓越,同时还支持分布式事务等特殊场景,而且RocketMQ是基于Java语言开发,适合深入阅读源码,有需要可以站在源码层面解决线上生产问题,包括源码的二次开发和改造

Kafka:提供的消息中间件的功能明显较少一些,相对上述巨款MQ中间件要少很多。但是Kafka的优势在于专为超高吞吐量的实时日志采集,实时数据同步,实时数据计算等场景来设计。因此Kafka在大数据领域中配合实时计算技术(比如spark streaming,storm,flink)使用的较多。

应用场景:

1.在电商项目,例如双11秒杀抢购商品,结算时,界面会提醒我们稍等一下

MQ不足:
  1. 系统更复杂,多了一个MQ组件
  2. 消息传递路径更长,延时会增加
  3. 消息可靠性和重复性互为矛盾,消息不丢不重难以同时保证
  4. 上游无法知道下游的执行结果,这一点是很致命的

调用方实时依赖执行结果的业务场景,请使用调用,而不是MQ。

什么时候使用MQ?
  1. 数据驱动的任务依赖
  2. 上有不关心下游执行结果
  3. 异步返回执行时间长

为什么会产生消息队列?有什么原因?

不同进程(process)之间传递信息时,两个进程之间耦合程度过高,改动一个进程,

MQ主要解决哪些层面问题:
  • 应用解耦。如图所示。假设有系统B、C、D都需要系统A的数据,于是系统A调用三个方法发送数据到B、C、D。这时,系统D不需要了,那就需要在系统A把相关的代码删掉。假设这时有个新的系统E需要数据,这时系统A又要增加调用系统E的代码。为了降低这种强耦合,就可以使用MQ,系统A只需要把数据发送到MQ,其他系统如果需要数据,则从MQ中获取即可。

  • 异步请求。如图所示。一个客户端请求发送进来,系统A会调用系统B、C、D三个系统,同步请求的话,响应时间就是系统A、B、C、D的总和,也就是800ms。如果使用MQ,系统A发送数据到MQ,然后就可以返回响应给客户端,不需要再等待系统B、C、D的响应,可以大大地提高性能。对于一些非必要的业务,比如发送短信,发送邮件等等,就可以采用MQ。

  • 流量削峰。如图所示。这其实是MQ一个很重要的应用。假设系统A在某一段时间请求数暴增,有5000个请求发送过来,系统A这时就会发送5000条SQL进入MySQL进行执行,MySQL对于如此庞大的请求当然处理不过来,MySQL就会崩溃,导致系统瘫痪。如果使用MQ,系统A不再是直接发送SQL到数据库,而是把数据发送到MQ,MQ短时间积压数据是可以接受的,然后由消费者每次拉取2000条进行处理,防止在请求峰值时期大量的请求直接发送到MySQL导致系统崩溃。

RabbitMQ是一款使用Erlang语言开发的,实现AMQP(高级消息队列协议)的开源消息中间件。首先要知道一些RabbitMQ的特点,官网可查:

  • 可靠性。支持持久化,传输确认,发布确认等保证了MQ的可靠性。
  • 灵活的分发消息策略。这应该是RabbitMQ的一大特点。在消息进入MQ前由Exchange(交换机)进行路由消息。分发消息策略有:简单模式、工作队列模式、发布订阅模式、路由模式、通配符模式。
  • 支持集群。多台RabbitMQ服务器可以组成一个集群,形成一个逻辑Broker。
  • 多种协议。RabbitMQ支持多种消息队列协议,比如 STOMP、MQTT 等等。
  • 支持多种语言客户端。RabbitMQ几乎支持所有常用编程语言,包括 Java、.NET、Ruby 等等。
  • 可视化管理界面。RabbitMQ提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker。
  • 插件机制。RabbitMQ提供了许多插件,可以通过插件进行扩展,也可以编写自己的插件。

RabbitMQ快速入门:

        RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。RabbitMQ官方地址:http://www.rabbitmq.com

下载安装:
        RabbitMQ由Erlang语言开发,需要安装与RabbitMQ版本对应的Erlang语言环境。RabbitMQ官网下载地址:http://www.rabbitmq.com/download.html 配置环境变量:
  1. 在环境变量中新增ERLANG——HOME,路径为erl的路径,不包含bin
  2. 在Path变量里新增%ERLANG——HOME%\bin
  3. 打开cmd输入erl查看是否成功
启动管理工具:

1.双击 RabbitMQ Command Prompt(sbin dir)

2.输入命令:rabbitmq-plugins enable rabbitmq_management

这样就启动管理工具了,可以试一下命令:

停止:net stop RabbitMQ 等价于 RabbitMQ Service -stop

启动:net start RabbitMQ 等价于 RabbitMQ Service -start

3.在浏览器输入地址查看:http://127.0.0.1:15672/

4.输入账号名和密码:guest

RabbitMQ的工作原理
下图是RabbitMQ的基本结构:

 组成部分说明:

Broker:消息队列服务进程,此进程包括两个部分:Exchange和Queue
Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。
Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的
Producer:消息生产者,即生产方客户端,生产方客户端将消息发送
Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。
生产者发送消息流程:

1、生产者和Broker建立TCP连接。

2、生产者和Broker建立通道。

3、生产者通过通道消息发送给Broker,由Exchange将消息进行转发。

4、Exchange将消息转发到指定的Queue(队列)

消费者接收消息流程:

1、消费者和Broker建立TCP连接

2、消费者和Broker建立通道

3、消费者监听指定的Queue(队列)

4、当有消息到达Queue时Broker默认将消息推送给消费者。

5、消费者接收到消息。

6、ack回复

RabbitMQ添加环境:

添加用在Admin界面中添加user 

用户角色
1、超级管理员(administrator)
可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行 *** 作。
2、监控者(monitoring)
可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
3、策略制定者(policymaker)
可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。
4、普通管理者(management)
仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。
5、其他
无法登陆管理控制台,通常就是普通的生产者和消费者。

创建Virtual Hosts

选中Admin用户,设置权限:

看到权限已加:

4.4.管理界面中的功能

学习五种队列

消息消费方 》 MQ 》 消息生产方

实例(生产方提供消息发送至中间件)上游发送消息到中间件:

config.properties

rabbitmq-server-address=localhost
rabbitmq-server-port=5672
rabbitmq-server-virtualHost=testUser
rabbitmq-server-vm=q_test2022425
rabbitmq-server-username=admin
rabbitmq-server-password=admin
private static String SERVER_ADDRESS;
    private static Integer SERVER_PORT;
    private static String SERVER_VIRTUAL_HOST;
    private static String SERVER_VM;
    private static String SERVER_USERNAME;
    private static String SERVER_PASSWORD;

    static {
        try(InputStream inputStream = MessageOperator.class.getResourceAsStream("/config.properties");) {
            Properties properties = new Properties();
            properties.load(inputStream);
            SERVER_ADDRESS = properties.getProperty("rabbitmq-server-address");
            SERVER_PORT = Integer.parseInt(properties.getProperty("rabbitmq-server-port"));
            SERVER_VIRTUAL_HOST = properties.getProperty("rabbitmq-server-virtualHost");
            SERVER_VM = properties.getProperty("rabbitmq-server-vm");
            SERVER_USERNAME = properties.getProperty("rabbitmq-server-username");
            SERVER_PASSWORD = properties.getProperty("rabbitmq-server-password");
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    public void sendToMQ(String message){
        //1.创建连接工厂对象
        ConnectionFactory factory = new ConnectionFactory();
        //2.设置 *** 作的虚拟机
        factory.setVirtualHost(SERVER_VIRTUAL_HOST);
        //3.设置 *** 作MQ服务器的主机以及端口号
        factory.setHost(SERVER_ADDRESS);
        factory.setPort(SERVER_PORT);
        //4.设置账号和密码
        factory.setUsername(SERVER_USERNAME);
        factory.setPassword(SERVER_PASSWORD);
        Connection newConnection = null;
        Channel channel = null;
        try {
            //5.创建通信连接(创建时可能会出现io异常)
            newConnection = factory.newConnection();
            //6.创建通道
            channel = newConnection.createChannel();
            //7.创建队列
            channel.queueDeclare(SERVER_VM,false,false,false,null);
            //8.将消息发送至RabbitMQ
            channel.basicPublish("",SERVER_VM,null,message.getBytes());
            if (channel.waitForConfirms()){
                System.out.println("发送成功!");
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                channel.close();
                newConnection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

 实例(消费方从中间件接收消息)下游接收消息从中间件获取上游消息:

private static String SERVER_ADDRESS;
    private static Integer SERVER_PORT;
    private static String SERVER_VIRTUAL_HOST;
    private static String SERVER_VM;
    private static String SERVER_USERNAME;
    private static String SERVER_PASSWORD;

    static {
        try(InputStream inputStream = MessageOperator.class.getResourceAsStream("/config.properties");) {
            Properties properties = new Properties();
            properties.load(inputStream);
            SERVER_ADDRESS = properties.getProperty("rabbitmq-server-address");
            SERVER_PORT = Integer.parseInt(properties.getProperty("rabbitmq-server-port"));
            SERVER_VIRTUAL_HOST = properties.getProperty("rabbitmq-server-virtualHost");
            SERVER_VM = properties.getProperty("rabbitmq-server-vm");
            SERVER_USERNAME = properties.getProperty("rabbitmq-server-username");
            SERVER_PASSWORD = properties.getProperty("rabbitmq-server-password");
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    public void receiveToMQ(){
        //1.创建连接工厂对象
        ConnectionFactory factory = new ConnectionFactory();
        //2.设置 *** 作的虚拟机
        factory.setVirtualHost(SERVER_VIRTUAL_HOST);
        //3.设置 *** 作MQ服务器的主机以及端口号
        factory.setHost(SERVER_ADDRESS);
        factory.setPort(SERVER_PORT);
        //4.设置账号和密码
        factory.setUsername(SERVER_USERNAME);
        factory.setPassword(SERVER_PASSWORD);
        Connection newConnection = null;
        Channel channel = null;
        try {
            //5.创建通信连接(创建时可能会出现io异常)
            newConnection = factory.newConnection();
            //6.创建通道
            channel = newConnection.createChannel();
            //7.创建队列
            channel.queueDeclare(SERVER_VM,false,false,false,null);
            //8.将从RabbitMQ接收消息
            QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
            channel.basicConsume(SERVER_VM,true,queueingConsumer);
            while (true){
                QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
                System.out.println("下游接收模块已接受上游信息:"+new String(delivery.getBody()));
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                channel.close();
                newConnection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

加锁:

(生产方):

public void sendToMQ(String message){
        //1.创建连接工厂对象
        ConnectionFactory factory = new ConnectionFactory();
        //2.设置 *** 作的虚拟机
        factory.setVirtualHost(SERVER_VIRTUAL_HOST);
        //3.设置 *** 作MQ服务器的主机以及端口号
        factory.setHost(SERVER_ADDRESS);
        factory.setPort(SERVER_PORT);
        //4.设置账号和密码
        factory.setUsername(SERVER_USERNAME);
        factory.setPassword(SERVER_PASSWORD);
        Connection newConnection = null;
        Channel channel = null;
        try {
            //5.创建通信连接(创建时可能会出现io异常)
            newConnection = factory.newConnection();
            //6.创建通道
            channel = newConnection.createChannel();
            //7.创建队列
            //b持久化   b1独占一个队列   b2在空闲时候自动删除
            channel.confirmSelect();
            channel.queueDeclare(SERVER_VM,true,false,true,null);
            //8.将消息发送至RabbitMQ
            channel.basicPublish("",SERVER_VM,null,message.getBytes());
            if (channel.waitForConfirms()){
                System.out.println("发送成功!");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

(消费方):

public void receiveToMQ(){
        //1.创建连接工厂对象
        ConnectionFactory factory = new ConnectionFactory();
        //2.设置 *** 作的虚拟机
        factory.setVirtualHost(SERVER_VIRTUAL_HOST);
        //3.设置 *** 作MQ服务器的主机以及端口号
        factory.setHost(SERVER_ADDRESS);
        factory.setPort(SERVER_PORT);
        //4.设置账号和密码
        factory.setUsername(SERVER_USERNAME);
        factory.setPassword(SERVER_PASSWORD);
        Connection newConnection = null;
        Channel channel = null;
        try {
            //5.创建通信连接(创建时可能会出现io异常)
            newConnection = factory.newConnection();
            //6.创建通道
            channel = newConnection.createChannel();
            //7.创建队列
            channel.queueDeclare(SERVER_VM,true,false,true,null);
            //8.将从RabbitMQ接收消息
            //回调
            DeliverCallback deliverCallback = (consumerTag,delivery) -> {
                String message = "下游接收模块已接受上游信息:"+new String(delivery.getBody());
                System.out.println("[x] Received"+message);
            };
            //参数2设置为true代表关闭自动ack机制(MQ自动删除数据)
            channel.basicConsume(SERVER_VM,true,deliverCallback,consumerTag -> {});
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

ack(使用5版本的代码):

(生产方):

public void sendByMQ(){
        //1.创建连接工厂对象
        ConnectionFactory factory = new ConnectionFactory();
        //2.设置 *** 作的虚拟机
        factory.setVirtualHost(SERVER_VIRTUAL_HOST);
        //3.设置 *** 作MQ服务器的主机以及端口号
        factory.setHost(SERVER_ADDRESS);
        factory.setPort(SERVER_PORT);
        //4.设置账号和密码
        factory.setUsername(SERVER_USERNAME);
        factory.setPassword(SERVER_PASSWORD);
        Connection newConnection = null;
        Channel channel = null;
        try {
            //5.创建通信连接(创建时可能会出现io异常)
            newConnection = factory.newConnection();
            //6.创建通道
            channel = newConnection.createChannel();
            //7.创建队列
            //b持久化   b1独占一个队列   b2在空闲时候自动删除
            channel.queueDeclare(SERVER_VM,true,false,true,null);
            channel.basicQos(1);//通道索引为1
            for (int i = 0; i < 30; i++) {
                String msg = "【美团外卖】订单编号:"+(i+1)+",正在提交订单";
                System.out.println(msg);
                channel.basicPublish("",SERVER_VM,null,msg.getBytes());
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }

(消费方):

public void getMsg() {
        //1.创建连接工厂对象
        ConnectionFactory factory = new ConnectionFactory();
        //2.设置 *** 作的虚拟机
        factory.setVirtualHost(SERVER_VIRTUAL_HOST);
        //3.设置 *** 作MQ服务器的主机以及端口号
        factory.setHost(SERVER_ADDRESS);
        factory.setPort(SERVER_PORT);
        //4.设置账号和密码
        factory.setUsername(SERVER_USERNAME);
        factory.setPassword(SERVER_PASSWORD);
        Connection newConnection = null;

        try {
            //5.创建通信连接(创建时可能会出现io异常)
            newConnection = factory.newConnection();
            //6.创建通道
            final Channel channel = newConnection.createChannel();
            //7.创建队列
            channel.queueDeclare(SERVER_VM, true, false, true, null);
            channel.basicQos(1);
            DeliverCallback deliverCallback = (consumerTag,delivery) -> {
                String msg = new String(delivery.getBody(),"UTF-8");
                try {
                    Thread.sleep(800);
                }catch (Exception e){
                    e.printStackTrace();
                }finally {
                    System.out.println("下游接收成功!");
                    System.out.println("Received :"+msg);
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
                }
            };
            channel.basicConsume(SERVER_VM,false,deliverCallback,consumerTag -> {});
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/756193.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-30
下一篇 2022-04-30

发表评论

登录后才能评论

评论列表(0条)

保存