RabbitMQ使用

RabbitMQ使用,第1张

RabbitMQ使用 一、引言

模块之间的耦合度过高,一旦一个模块宕机后,全部功能都不能用了,并且同步通讯的成本过高,用户体验差。

RabbitMQ引言

二、RabbitMQ介绍

市面上比较火爆的几款MQ:

ActiveMQ,RocketMQ,Kafka,RabbitMQ。

  • 语言的支持:ActiveMQ,RocketMQ只支持Java语言,Kafka可以支持多门语言,RabbitMQ支持多种语言。

  • 效率方面:ActiveMQ,RocketMQ,Kafka效率都是毫秒级别,RabbitMQ是微秒级别的。

  • 消息丢失,消息重复问题: RabbitMQ针对消息的持久化,和重复问题都有比较成熟的解决方案。

  • 学习成本:RabbitMQ非常简单。

RabbitMQ是由Rabbit公司去研发和维护的,最终是在Pivotal,由Erlang语言开发(并发的编程语言)

RabbitMQ严格的遵循AMQP协议,高级消息队列协议,帮助我们在进程之间传递异步消息。

RabbitMQ是用Erlang实现的一个高并发高可靠AMQP消息队列服务器。支持消息的持久化、事务、拥塞控制、负载均衡等特性,使得RabbitMQ拥有更加广泛的应用场景。RabbitMQ跟Erlang和AMQP有关。下面简单介绍一下Erlang和AMQP。

Erlang是一门动态类型的函数式编程语言,它也是一门解释型语言,由Erlang虚拟机解释执行。从语言模型上说,Erlang是基于Actor模型的实现。在Actor模型里面,万物皆Actor,每个Actor都封装着内部状态,Actor相互之间只能通过消息传递这一种方式来进行通信。对应到Erlang里,每个Actor对应着一个Erlang进程,进程之间通过消息传递进行通信。相比共享内存,进程间通过消息传递来通信带来的直接好处就是消除了直接的锁开销(不考虑Erlang虚拟机底层实现中的锁应用)。

AMQP(Advanced Message Queue Protocol)定义了一种消息系统规范。这个规范描述了在一个分布式的系统中各个子系统如何通过消息交互。而RabbitMQ则是AMQP的一种基于erlang的实现。AMQP将分布式系统中各个子系统隔离开来,子系统之间不再有依赖。子系统仅依赖于消息。子系统不关心消息的发送者,也不关心消息的接受者。

优点

1、解耦:降低系统模块的耦合度

2、提高系统响应时间

3、异步消息

4、过载保护,流量削峰

1.应用解耦

场景:双11购物,用户下单后,订单系统需要通知库存系统,传统的做法就是订单系统调用库存系统的接口.

这种做法有一个缺点:

  • 当库存系统出现故障时,订单就会失败.

  • 订单系统和库存系统高耦合.

 

引入消息队列

  • 订单系统: 用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。

  • 库存系统: 订阅下单的消息,获取下单消息,进行库 *** 作。 就算库存系统出现故障,消息队列也能保证消息的可靠投递,不会导致消息丢失

 

2.异步处理 场景说明:用户注册后,需要发注册邮件和注册短信,传统的做法有两种 1. 串行的方式 2. 并行的方式

串行方式: 将注册信息写入数据库后,发送注册邮件,再发送注册短信,以上三个任务全部完成后才返回给客户端。 这有一个问题是,邮件,短信并不是必须的,它只是一个通知,而这种做法让客户端等待没有必要等待的东西.

 

并行方式:将注册信息写入数据库后,发送邮件的同时,发送短信,以上三个任务完成后,返回给客户端,并行的方式能提高处理的时间

 

假设三个业务节点分别使用50ms,串行方式使用时间150ms,并行使用时间100ms。虽然并性已经提高的处理时间,但是,前面说过,邮件和短信对我正常的使用网站没有任何影响,客户端没有必要等着其发送完成才显示注册成功,英爱是写入数据库后就返回.

消息队列

引入消息队列后,把发送邮件,短信不是必须的业务逻辑异步处理,引入消息队列后,用户的响应时间就等于写入数据库的时间+写入消息队列的时间(可以忽略不计),引入消息队列后处理后,响应时间是串行的3倍,是并行的2倍

 

3.流量削峰

流量削峰一般在秒杀活动中应用广泛

场景:秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。

作用: 1.可以控制活动人数,超过此一定阀值的订单直接丢弃

2.可以缓解短时间的高流量压垮应用(应用程序按自己的最大处理能力获取订单)

 

这样我们就可以采用队列的机制来处理,如同我们在超市结算一样,并不会一窝蜂一样涌入收银台,而是排队结算,一个接着一个的处理,不能插队,因为同时结算就只能达到这么多。

三、RabbitMQ安装
version: "3.1"
services:
  rabbitmq:
    image: daocloud.io/library/rabbitmq:management
    restart: always
    container_name: rabbitmq
    ports:
      - 5672:5672
      - 15672:15672
    volumes:
      - ./data:/var/lib/rabbitmq
[root@iz8vbdmrir2n6xqzrbd93hz /]# cd /opt
[root@iz8vbdmrir2n6xqzrbd93hz opt]# ls
containerd  docker_mysql_tomcat  docker_nginx  docker_ssm  yangl
[root@iz8vbdmrir2n6xqzrbd93hz opt]# mkdir docker_rabbitmq
[root@iz8vbdmrir2n6xqzrbd93hz opt]# cd docker_rabbitmq/
[root@iz8vbdmrir2n6xqzrbd93hz docker_rabbitmq]# vim docker-compose.yml
[root@iz8vbdmrir2n6xqzrbd93hz docker_rabbitmq]# docker-compose-Linux-x86_64 up -d
Creating network "docker_rabbitmq_default" with the default driver
Pulling rabbitmq (daocloud.io/library/rabbitmq:management)...
management: Pulling from library/rabbitmq
d7c3167c320d: Pulling fs layer
d7c3167c320d: Pull complete
131f805ec7fd: Pull complete
322ed380e680: Pull complete
6ac240b13098: Pull complete
58ab633708c7: Pull complete
4ef7b4c52e3f: Pull complete
0bcc8241708b: Pull complete
4bbf89f47f34: Pull complete
2dcee968b577: Pull complete
8f702d8e2d02: Pull complete
5159883c6988: Pull complete
ba6d73924acf: Pull complete
Digest: sha256:a9f93f113e1bbcd1de7035bdc433be68fa04086d672464233d339fa0a52b9747
Status: Downloaded newer image for daocloud.io/library/rabbitmq:management
Creating rabbitmq ... done
[root@iz8vbdmrir2n6xqzrbd93hz docker_rabbitmq]# docker ps
ConTAINER ID        IMAGE                                     COMMAND                  CREATED             STATUS              PORTS                                                                                        NAMES
a9ecaea4f19a        daocloud.io/library/rabbitmq:management   "docker-entrypoint.s…"   10 seconds ago      Up 9 seconds        4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, 15671/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp   rabbitmq
​

浏览器访问:http://ip:15672 (注:云服务器记得开放 15672 和 5672 端口)

用户名和密码默认都是:guest

四、RabbitMQ架构【重点】

4.1 官方的简单架构图

  • Publisher - 生产者:发布消息到RabbitMQ中的Exchange

  • Consumer - 消费者:监听RabbitMQ中的Queue中的消息

  • Exchange - 交换机:和生产者建立连接并接收生产者的消息

  • Queue - 队列:Exchange会将消息分发到指定的Queue,Queue和消费者进行交互

  • Routes - 路由:交换机以什么样的策略将消息发布到Queue

简单架构图

 

4.2 RabbitMQ的完整架构图

完整架构图

完整架构图

 

4.3 RabbitMQ 通讯方式

RabbitMQ Tutorials — RabbitMQ

4.4 Hello-World案例演示

  1. 导入依赖


    com.rabbitmq
    amqp-client
    5.9.0
  1. 创建生产者 Publisher

package com.qf.rabbitmq;
​
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
​
//生产者
public class Publisher {
    public static void main(String[] args) throws Exception{
        System.out.println("Publisher...");
        //配置连接参数
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.25.132");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        //获取连接
        Connection connection = factory.newConnection();
        //获取Channel
        Channel channel = connection.createChannel();
        //配置队列参数
        //参数1:queue - 指定队列的名称
        //参数2:durable - 当前队列是否需要持久化,值为true时表示持久化,rabbitmq宕机或重启后,队列依然在
        //参数3:exclusive - 当前队列是否为排他队列,值为true时表示与当前连接(connection)绑定,连接关闭,队列消失
        //参数4:autoDelete - 当前队列是否自动删除,值为true时表示队列中的消息一旦被消费,该队列会消失
        //参数5:arguments - 指定当前队列的相关参数
        channel.queueDeclare("helloworldQueue",false,false,false,null);
        //发布消息到exchange,同时指定路由的规则
        // 参数1:指定exchange,使用""
        // 参数2:指定路由的规则,使用具体的队列名称
        // 参数3:指定传递的消息所携带的properties,使用null
        // 参数4:指定发布的具体消息,byte[]类型
        channel.basicPublish("","helloworldQueue",null,"helloworld".getBytes());
        //关闭资源
        //channel.close();
        //connection.close();
    }
}
  1. 创建消费者 Consumer

package com.qf.rabbitmq;
​
import com.rabbitmq.client.*;
import java.io.IOException;
​
//消费者
public class Consumer {
    public static void main(String[] args)throws Exception {
        System.out.println("Consumer...");
        //配置连接参数
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.25.132");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        //获取连接
        Connection connection = factory.newConnection();
​
        Channel channel = connection.createChannel();
        //要与生产者中的该方法一致(注:方法中的参数值必须保持一致)
        channel.queueDeclare("helloworldQueue",false,false,false,null);
​
        //监听队列
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
​
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("来自生产者的消息:"+new String(body));
            }
        };
​
        //消费消息
        //参数1:queue - 指定消费哪个队列
        //参数2:autoAck - 指定是否自动ACK (true表示接收到消息后,会立即告知RabbitMQ,false表示不告知)
        //参数3:consumer - 指定消费回调
        channel.basicConsume("helloworldQueue",true,defaultConsumer);
    }
}

分别启动生产者和消费者进行测试(生产一次才能消费一次)

4.5 基本原理

RabbitMQ是消息队列的一种实现,那么一个消息队列到底需要什么?答案是队列,即Queue,那么接下来所有名词都是围绕这个Queue来拓展的。

就RabbimtMQ而言,Queue是其中的一个逻辑上的实现,我们需要连接到RabbitMQ来 *** 作队列进而实现业务功能,所以就会有Connection,我们发一条消息连接一次,这样很显然是浪费资源的,建立连接的过程也很耗时,所以我们就会做一个东西让他来管理连接,当我用的时候,直接从里边拿出来已经建立好的连接发信息,那么ConnectionFactory应运而生。

接下来,当程序开发时,可能不止用到一个队列,可能有订单的队列、消息的队列、任务的队列等等,那么就需要给不同的queue发信息,那么和每一个队列连接的这个概念,就叫Channel。

再往下来,当我们开发的时候还有时候会用到这样一种功能,就是当我发送一条消息,需要让几个queue都收到,那么怎么解决这个问题呢,难道我要给每一个queue发送一次消息?那岂不是浪费带宽又浪费资源,我们能想到什么办法呢,当然是我们发送给RabbitMQ服务器一次,然后让RabbitMQ服务器自己解析需要给哪个Queue发,那么Exchange就是干这件事的 但是我们给Exchange发消息,他怎么知道给哪个Queue发呢?这里就用到了RoutingKey和BindingKey BindingKey是Exchange和Queue绑定的规则描述,这个描述用来解析当Exchange接收到消息时,Exchange接收到的消息会带有RoutingKey这个字段,Exchange就是根据这个RoutingKey和当前Exchange所有绑定的BindingKey做匹配,如果满足要求,就往BindingKey所绑定的Queue发送消息,这样我们就解决了我们向RabbitMQ发送一次消息,可以分发到不同的Queue的过程

至此,我们就把所有的名词贯通咯,接下来做个概要描述:

  • Broker:提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输

  • ConnectionFactory:与RabbitMQ服务器连接的管理器

  • Connection:与RabbitMQ服务器的TCP连接

  • Channel:与Exchange的连接,一个Connection可以包含多个Channel。之所以需要Channel,是因为TCP连接的建立和释放都是十分昂贵的,为了多路复用。RabbitMQ建议客户端线程之间不要共用Channel,但是建议尽量共用Connection。

  • Queue:消息的载体,每个消息都会被投到一个或多个队列。

  • Exchange:接受消息生产者的消息,并根据消息的RoutingKey和 Exchange绑定的BindingKey,以及Binding规则将消息路由给服务器中的队列。ExchangeType决定了Exchange路由消息的行为,例如,在RabbitMQ中,ExchangeType有direct、Fanout和Topic三种,不同类型的Exchange路由的行为是不一样的。

  • Message Queue:消息队列,用于存储还未被消费者消费的消息。

  • Message: 由Header和Body组成,Header是由生产者添加的各种属性的集合,包括Message是否被持久化、由哪个Message Queue接受、优先级是多少等。而Body是真正需要传输的APP数据。

  • RoutingKey:由Producer发送Message时指定,指定当前消息被谁接受

  • BindingKey:由Consumer在Binding Exchange与Message Queue时指定,指定当前Exchange下,什么样的RoutingKey会被下派到当前绑定的Queue中

  • Binding:联系了Exchange与Message Queue。Exchange在与多个Message Queue发生Binding后会生成一张路由表,路由表中存储着Message Queue所需消息的限制条件即Binding Key。当Exchange收到Message时会解析其Header得到Routing Key,Exchange根据Routing Key与Exchange Type将Message路由到Message Queue。Binding Key由Consumer在Binding Exchange与Message Queue时指定,而Routing Key由Producer发送Message时指定,两者的匹配方式由Exchange Type决定。

  • Server: 接受客户端连接,实现AMQP消息队列和路由功能的进程。

  • Virtual Host:其实是一个虚拟概念,类似于权限控制组,可以通过命令分配给用户Virtual Host的权限,默认的guest用户是管理员权限,初始空间有/,一个Virtual Host里面可以有若干个Exchange和Queue,但是权限控制的最小粒度是Virtual Host

五、SpringBoot整合RabbitMQ的使用【重点】

5.1 导入依赖


   org.springframework.boot
   spring-boot-starter-amqp

5.2 在application.properties中增加配置

#对于rabbitMQ的支持
spring.rabbitmq.host=192.168.153.136
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

5.3 Hello-World 简单队列

一个生产者,一个默认的交换机,一个队列,一个消费者

结构图

1)创建配置类,用于创建队列对象

package com.qf.simple;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
​
@Configuration
public class SimpleQueueConfig {
​
    @Bean
    public Queue simple(){
        return new Queue("simpleQueue");
    }
}

2)创建生产者

package com.qf.simple;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
​
@Component
public class SimpleQueueProducer {
​
    @Autowired
    private RabbitTemplate rabbitTemplate;
​
    public void send(){
        System.out.println("SimpleQueueProducer");
        //发送消息,第一个参数为队列名称,第二参数为消息内容
        rabbitTemplate.convertAndSend("simpleQueue","简单模式");
    }
}

3)创建消费者

package com.qf.simple;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
​
@Component
@RabbitListener(queues="simpleQueue")//监听指定的消息队列
public class SimpleQueueCustomer {
    
    //@RabbitHandler修饰的方法中实现接受到消息后的处理逻辑
    @RabbitHandler
    public void receive(String content){
        System.out.println("SimpleQueueCustomer");
        System.out.println("来SimpleQueueProducer的信息:"+content);
    }
}

4)在srctestjavacomqfRabbitmq01ApplicationTests.java进行测试

package com.qf;
import com.qf.simple.SimpleQueueProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
​
@SpringBootTest
class Rabbitmq01ApplicationTests {
    
    @Test
    void contextLoads() {
    }
    
    @Autowired
    private SimpleQueueProducer simpleQueueProducer;
​
    @Test
    public void testSimpleQueueProducer(){
        simpleQueueProducer.send();
    }
}

如果传递的是 JavaBean 对象,该实体类需要实现序列化接口,具体流程如下:

  1. 导入lombok依赖,创建User类

package com.qf.pojo;
​
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
​
import java.io.Serializable;
​
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User implements Serializable {
    private String username;
    private String password;
}
  1. 修改生产者中的代码

package com.qf.simple;
​
import com.qf.pojo.User;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
​
//生产者
@Component
public class SimplePublisher {
​
    @Autowired
    private RabbitTemplate rabbitTemplate;
​
    public void send(){
        System.out.println("SimplePublisher...");
        //rabbitTemplate.convertAndSend("","simpleQueue","简单模式");
        rabbitTemplate.convertAndSend("","simpleQueue",new User("张三","123"));
    }
}
​
  1. 修改消费者中的代码

package com.qf.simple;
​
import com.qf.pojo.User;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
​
//消费者
@Component
@RabbitListener(queues = "simpleQueue")
public class SimpleConsumer {
​
//    @RabbitHandler
//    public void receive(String content){
//        System.out.println("SimpleConsumer...");
//        System.out.println("来自SimplePublisher的消息:"+content);
//    }
​
    @RabbitHandler
    public void receive(User user){
        System.out.println("SimpleConsumer...");
        System.out.println("来自SimplePublisher的消息:"+user);
    }
}
  1. 运行测试类即可!

5.4 Work 工作队列

一个生产者,一个默认的交换机,一个队列,两个消费者,默认采用公平分发

结构图

 

1)创建配置类,用于创建队列对象

package com.qf.work;
​
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
​
@Configuration
public class WorkQueueConfig {
    
    @Bean
    public Queue work(){
        return new Queue("workQueue");
    }
}

2)创建生产者

package com.qf.work;
​
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
​
@Component
public class WorkQueueProducer {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public void send(){
        System.out.println("WorkQueueProducer");
        rabbitTemplate.convertAndSend("workQueue","工作队列模式");
    }
}

3)创建消费者,本案例创建两个消费者

package com.qf.work;
​
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
​
@Component
@RabbitListener(queues="workQueue")
public class WorkQueueCustomer_01 {
    
    @RabbitHandler
    public void receive(String content){
        System.out.println("WorkQueueCustomer_01:"+content);
    }
}
package com.qf.work;
​
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
​
@Component
@RabbitListener(queues="workQueue")
public class WorkQueueCustomer_02 {
​
    @RabbitHandler
    public void receive(String content){
        System.out.println("WorkQueueCustomer_02:"+content);
    }
}

4)在测试类中添加对象和方法进行测试

@Autowired
private WorkQueueProducer workQueueProducer;
​
@Test
public void testWorkQueueProducer(){
​
    for (int i = 0; i<4; i++){
        workQueueProducer.send();
    }
}

5.5 Publish/Subscribe 发布订阅模式

一个生产者,一个交换机,两个队列,两个消费者

结构图

 

使用该模式需要借助交换机,生产者将消息发送到交换机,再通过交换机到达队列.

有四种交换机:direct/topic/headers/fanout,默认交换机是direct,发布与订阅的实现使用第四个交换器类型fanout

使用交换机时,每个消费者有自己的队列,生产者将消息发送到交换机(X),每个队列都要绑定到交换机

本例中:

创建2个消息队列

创建一个fanout交换机对象

Bind交换机和队列

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5679288.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存