MQ介绍,RabbitMQ在SpringAMQP中的使用

MQ介绍,RabbitMQ在SpringAMQP中的使用,第1张

MQ介绍,RabbitMQ在SpringAMQP中的使用 同步调用
  • 优点:时效性强,可以立即得到结果
  • 缺点:
  1. 耦合度高,每次加入新需求都要该原来的代码
  2. 性能和吞吐能力下降,调用者需要等待提供者响应后才能继续下一步 *** 作
  3. 有额外资源消耗,调用者在等待服务响应过程中,不能释放请求占用的资源
  4. 有级联失效问题,如果服务提供者出现问题,所有调用方都会跟着出现问题
异步调用——通过Broker代理,调用者在请求broker后可以立即返回,无需等待所有结果返回后再响应,这里引用黑马的图片,很直观

优点:

  1. 耦合度低
  2. 吞吐量提升
  3. 故障隔离
  4. 流量削峰:当有大量请求时,可以先放在broker里,服务器根据自己的处理速度再去broker里处理请求,以此来缓解服务器压力

缺点:

  1. 依赖于Broker的可靠性、安全性、吞吐能力,所以必须保证broker足够可靠
  2. 架构复杂,业务没有明显的业务流程线,不好追踪管理
MQ简介

MQ(MessageQueue),中文是消息队列,字面来看就是存放消息的队列。也就是上述中事件驱动架构中的Broker,下面通过黑马的截图来看看几种常见的MQ区别

 RabbitMQ学习

RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官网:https://www.rabbitmq.com/,结构如下

基本概念

  • channel: *** 作MQ的工具
  • exchange:路由消息到队列中
  • queue:缓存消息的队列
  • virtual host:虚拟主机,是对queue、exchange等资源的逻辑分组

安装

这次安装是在以前安装好的docker容器上进行的安装,步骤如下

  1. 下载RabbitMQ
    docker pull rabbitmq:3-management  #冒号后面是版本号

    阿萨

  2. 下载好后运行容器

    docker run 
     -e RABBITMQ_DEFAULT_USER=itcast     #用户名
     -e RABBITMQ_DEFAULT_PASS=123321     #密码  
     --name mq     #容器名字
     --hostname mq1     #主机名 
     -p 15672:15672     #rabbitmq的管理平台端口,可通过该端口登录管理平台
     -p 5672:5672       #收发消息的端口  
     -d     #后台运行
     rabbitmq:3-management #镜像名称

    完成后就安装好了

可以通过输入IP地址和端口在浏览器访问

常见的消息模型

基本消息队列,下图是官网解释

工作消息队列概念

以上两种都是通过队列来发送,没有exchange交换机的概念,并且消息被消费后立即销毁

以下三种都属于发布订阅模式(允许将同一个消息发送给多个消费者),根据交换机类型不同又分为三类,分别是

  • 广播模式(Fanout),一次向多个消费者发送消息,x指的是exchange交换机
  • 路由模式(Direct),选择性接收消息

  • 主题模式(Topic),基于模式(主题)接收消息

基本消息队列消息发送流程

  1. 创建connection
  2. 创建channel
  3. 利用channel声明队列
  4. 利用channel向队列发送消息

基本消息队列消息接收流程

  1. 创建connection
  2. 创建channel
  3. 利用channel声明队列
  4. 定义consumer的消费行为handleDelivery()
  5. 利用channel将消费者与队列绑定
SpringAMQP

AMQP:是用于在应用程序或之间传递业务消息的开放标准,该协议与语言和平台无关,更符合微服务中独立性的要求

SpringAMQP:是基于AMQP协议的一套api规范,提供了模板来收发消息。包括了两部分,其中spring-amqp是基础抽象,spring-rabbit是底层默认实现的,官网地址Spring AMQP

通俗来讲就是通过官方原生的方法来创建消息队列太麻烦了,spring现在通过封装底层实现来帮我们简化了创建流程和代码难度

下面通过SpringAMQP创建做一个最简单的基本消息队列

  1. 创建一个父工程并引入相关依赖
     
            
                org.springframework.boot
                spring-boot-starter-amqp
            
            
            
                org.springframework.boot
                spring-boot-starter-test
            
  2. 创建推送者Publisher子项目,并且配置yml文件

    logging:
      pattern:
        dateformat: MM-dd HH:mm:ss:SSS
    spring:
      rabbitmq:
        host: 192.168.65.129 # rabbitMQ的ip地址
        port: 5672 # rabbitmq收发消息的端口
        username: itcast    #  rabbitmq用户名
        password: 123321    #  rabbitmq密码
        virtual-host: /   #虚拟主机名字
  3. 写一个单元测试

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {  
@Autowired
    private RabbitTemplate rabbitTemplate; //引入rabbitmq的协议标准对象

    @Test
    public void testSendMessage2SimpleQueue() {
        String queueName = "simple.queue"; //队列名字
        String message = "hello, spring amqp!";  //队列消息
        rabbitTemplate.convertAndSend(queueName, message);
    }
}

这里注意,我指定的队列名字叫做"simple.queue",但是在rabbitmq里面没有这个队列,需要我手动去创建一个,如下

 创建成功后,启动单元测试,点击队列名字,看一下效果

 以上就是发送消息到队列的过程,但并没有接收消息,现在写如何接收队列里的消息

  1. 创建Consumer消费者springboot子工程
  2. 引入依赖(父工程引过了所以可以不用引)
  3. 配置yml文件
    logging:
      pattern:
        dateformat: MM-dd HH:mm:ss:SSS
    spring:
      rabbitmq:
        host: 192.168.65.129 # rabbitMQ的ip地址
        port: 5672 # 端口
        username: itcast
        password: 123321
        virtual-host: /
  4. 创建一个监听类,这里取名listener,

    @Component  //注入为spring的一个bean,好让spring监控到
    public class SpringRabbitListener {
    
        @RabbitListener(queues = "simple.queue")  //这里是指定我们监听的是哪一个消息队列
        public void listenSimpleQueue(String msg){
            System.out.println("消费者接收到的消息是:"+msg);
        }
    }

    完成后启动application启动类,然后就可以接收消息了,如下

     注意:消费完后队列里的消息会被销毁,所以再去rabbitmq的队列里就看不见已经消费过的消息了

下面通过SpringAMQP创建做一个工作队列(work模型),就是将多个消费者绑定到同一队列,同一条消息只会被一个消费者处理

任务:在一秒内由p端发送50条数据,C1处理速度是40条每秒,C2是10条每秒,让两个消费者协同处理

  1. 在发送端写好发送代码
      @Test
        public void WorkQueue() throws InterruptedException {
            String queueName="simple.queue";
            String message="工作队列";
            for (int i = 0; i <50 ; i++) {
                rabbitTemplate.convertAndSend(queueName,message+i);
                Thread.sleep(20);//这里表示50条数据在一秒钟发完
            }
  2. 在消费者端写好两个消费者代码

    @RabbitListener(queues = "simple.queue")
        public void listenWorkQueue1(String msg) throws InterruptedException {
            System.out.println("消费者1接收到的消息是:"+msg+LocalTime.now());
            Thread.sleep(25);//表示消费者1每秒处理40个消息
        }
        @RabbitListener(queues = "simple.queue")
        public void listenWorkQueue2(String msg) throws InterruptedException {
            System.err.println("消费者2接收到的消息是:"+msg+LocalTime.now());
            Thread.sleep(100); //表示消费者2每秒处理10个消息
        }

    这里要注意的是,rabbitmq默认才是消息预取的方式,就是两个消费者都先把所有消息从队列里拿过来再处理,如果有50个消息,每个消费者就先拿25个,到手后再一起处理,如果两个消费者的消费能力不一样,这显然是有弊端的,所以需要进行如下配置

  3. 在yml文件做配置

    logging:
      pattern:
        dateformat: MM-dd HH:mm:ss:SSS
    spring:
      rabbitmq:
        host: 192.168.65.129 # rabbitMQ的ip地址
        port: 5672 # 端口
        username: itcast
        password: 123321
        virtual-host: /
        listener:
          simple:
            prefetch: 1   #这是新加的配置,表示每个消费者最多先预取1个消息,消费完后再取下一个,保证了每个消费者都根据自己的消费速度来处理消息
  4. 然后分别启动消息发送者和消费者,查看情况,如果电脑处理器正常一般就是消费者一接收了40条消息,消费者2接受了10条消息,耗时在一秒内

发布订阅模式案列学习

首先是广播模式案列实现,Fanout exchange会将收到的消息路由到每一个跟其绑定的queue

步骤如下:

  1. 在消息发送端写发送代码,以前都是直接发给队列,现在是发送给交换机,所以代码如下
    @Test
        public void testSendFanoutExchange() {
            // 交换机名称
            String exchangeName = "exchange01";
            // 消息
            String message = "hello, every one!";
            // 发送消息
            rabbitTemplate.convertAndSend(exchangeName, "", message);
        }
  2. 在消费者端新建一个配置类,用于建立交换机和队列名称,并将交换机和队列绑定,代码如下

    @Configuration//表示该类为配置类
    public class FanoutConfig {
        // 交换机有多种类型,这里是广播模式,所以声明FanoutExchange交换机名字叫做"exchange01"
        @Bean  //声明为一个bean交给spring管理
        public FanoutExchange fanoutExchange(){
            return new FanoutExchange("exchange01");
        }
    
        // 声明队列名字叫做fanout.queue1
        @Bean
        public Queue fanoutQueue1(){
            return new Queue("fanout.queue1");
        }
    
        // 绑定队列1到交换机
        @Bean
        public Binding fanoutBinding(Queue fanoutQueue1,FanoutExchange fanoutExchange){
            return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);//表示将fanoutQueue1队列绑定到fanoutExchange交换机
        }
    
        // fanout.queue2
        @Bean
        public Queue fanoutQueue2(){
            return new Queue("fanout.queue2");
        }
    
        // 绑定队列2到交换机
        @Bean
        public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
            return BindingBuilder
                    .bind(fanoutQueue2)
                    .to(fanoutExchange);
        }
    }
    
  3. 完成消费者代码的编写

    @RabbitListener(queues = "fanout.queue1")
        public void listenFanoutQueue1(String msg) {
            System.out.println("消费者1接收到fanout.queue1的消息是:"+"{"+msg+"}");
        }
    
        @RabbitListener(queues = "fanout.queue2")
        public void listenFanoutQueue2(String msg) {
            System.out.println("消费者2接收到fanout.queue2的消息是:"+"{"+msg+"}");
        }

    完成后重启服务,效果如下

如此便实现了一个消息被多个消费者消费的目的

路由模式(routes)

前面介绍的广播模式,可以将一个消息发送给多个与路由器绑定的队列;而现在介绍的路由模式,是把消息发送给与路由器绑定了相同key值的队列,实现了消息的指定发送,即选择性接收消息

发布者发送消息时,会指定一个RoutingKey,而队列则会绑定一个bindingKey,当这两个参数的值相同时,路由器才会将消息路由到队列中去,现在做一个案列,来实现消息的指定发送

  1. 这次先写发送端代码
     @Test
        public void testSendDirectExchange() {
            // 交换机名称
            String exchangeName = "direct exchange";
            // 消息
            String message = "发给所有队列";
            // 发送消息
            rabbitTemplate.convertAndSend(exchangeName, "common" , message);
        }
  2. 写接收端代码,这次不通过bean的方式来写,直接在注解上配置队列名字和路由器名字及类型

    //路由模式
        @RabbitListener(bindings = @QueueBinding(
                value = @Queue(name = "direct.queue1"), //声明队列的名字
                exchange = @Exchange(name = "direct exchange", type = ExchangeTypes.DIRECT), //声明路由器的名字和路由器的类型,这里是direct类型
                key = {"common", "队列A的key"} //声明路由器的key值,这里声明一个共有的common和一个特有的”队列A的key“
        ))
        public void listenDirectQueue1(String msg){
            System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
        }
    
    @RabbitListener(bindings = @QueueBinding(
                value= @Queue(name = "direct.queue2"),
                exchange = @Exchange(name = "direct exchange",type = ExchangeTypes.DIRECT),
                key = {"common","队列B的key"}
        ))
        public void listenDirectQueue2(String msg){
            System.out.println("消费者接收到direct.queue2的消息"+"【" + msg + "】");
        }

    注意看接收端的代码,有一个key值是common,即是共有的key值,我们在发送端也写有common这个值,启动服务看效果

可以发现通过共有的key值,路由器把消息分别发送给了两个队列,由两个消费者端接收到了一样的消息,这和广播模式很像

现在修改routingKey值,改为某个队列特有的key,将key值改为"队列A的key",看效果

 现在就只有direct.queue1队列才能收到消息了,另外一个队列因为key值不同所有没能收到消息,由此实现了路由模式的选择性接收消息的效果

话题模式(Topic)

TopicExchange和DirectExchange类似,区别在于routingKey必须是多个单词的列表,并且以.分割

并且在绑定时可以以#或*作为通配符

#:代指一个或多个单词

*:代指一个单词

话题模式就是要同一个话题的消息才能被接收,比如有中国天气和外国天气,中国军事和外国军事,这时如果设置routingKey为中国.#,则发送中国天气和中国军事这两个板块。如果设置为#.军事,则发送中国军事和外国军事这两个板块,下面举例

  1. 先写发送端
        @Test
        public void testSendTopicExchange() {
            // 交换机名称
            String exchangeName = "Topic exchange";
            // 消息
            String message = "这是中国军事频道";
            // 发送消息
            rabbitTemplate.convertAndSend(exchangeName, "china.junshi" , message);
        }
    
        @Test
        public void testSendTopicExchange2() {
            // 交换机名称
            String exchangeName = "Topic exchange";
            // 消息
            String message = "这是外国军事频道";
            // 发送消息
            rabbitTemplate.convertAndSend(exchangeName, "waiguo.junshi" , message);
        }
    }

  2.  写接收端代码,和路由模式差不多,就是bandingKey要改一下

        @RabbitListener(bindings = @QueueBinding(
                value = @Queue(name= "topce.queue1"),
                exchange = @Exchange(name= "Topic exchange" ,type = ExchangeTypes.TOPIC),
                key = "china.#"
        )
        )
        public void listenTopicQueue1(String msg){
            System.out.println("只接收到与china相关的消息,不管是什么内容"+"【" + msg + "】");
        }
    
        @RabbitListener(bindings = @QueueBinding(
                value = @Queue(name= "topce.queue2"),
                exchange = @Exchange(name= "Topic exchange" ,type = ExchangeTypes.TOPIC),
                key = "#.junshi"
        )
        )
        public void listenTopicQueue2(String msg){
            System.out.println("只接收到junshi相关的消息,不管是哪个国家的"+"【" + msg + "】");
        }

    先写启动中国军事频道的测试类,会发现接收端所有中国的话题会报道中国军事,军事话题会报道军事

     再把发送端代码的中国军事改为天气

     @Test
        public void testSendTopicExchange() {
            // 交换机名称
            String exchangeName = "Topic exchange";
            // 消息
            String message = "今天中国气温适中";
            // 发送消息
            rabbitTemplate.convertAndSend(exchangeName, "china.tianqi" , message);
        }

    效果如下

     

消息转换器

在springAMQP的发送方法中,接收消息的类型是Object,也就是所我们可以发送任意对象的消息,SpringAMQP会自动序列化字节后发送

下面通过创建对象的方式,将对象从发送端传递到消费端,看看如何实现

  1. 在夫父工程引入相关依赖
    
                com.fasterxml.jackson.core
                jackson-databind
            
  2. 在消费者端配置类声明一个队列

     @Bean
        public Queue objectQueue(){
            return new Queue("object.queue");
        }
  3. 在消费者端写好接收代码

    @RabbitListener(queues = "object.queue")
        public void listenObjectQueue2(List msg){
            System.out.println("接收到object.queue的消息:" + msg);}
  4. 在发送端写发送代码,以对象形势发送,这里用list对象

       @Test
        public void testSendObject() {
            List list =new ArrayList();
            list.add("小鱼");
            list.add("大鱼");
            // 发送消息
            rabbitTemplate.convertAndSend( "object.queue",list);
        }
  5. 分别在发送端和接收端的启动类写转换工具的方法

     @Bean
        public MessageConverter messageConverter(){
            return new Jackson2JsonMessageConverter();
        }

    然后启动服务,效果如下

     

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5676890.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存