RabbitMQ 入门篇02

RabbitMQ 入门篇02,第1张

RabbitMQ 入门篇02 功能

1.基本消息模型
2.消息确认机制(ack)
3.竞争消费者模式

1 github:源码地址 2 环境-父工程(管理依赖版本)


    4.0.0

    
        org.springframework.boot
        spring-boot-starter-parent
        2.5.0
         
    

    com.yzm
    rabbitmq
    0.0.1-SNAPSHOT
    pom
    rabbitmq
    Demo project for Spring Boot

    
        rabbitmq01
    

    
        UTF-8
        UTF-8
        1.8
    

    
        
            org.springframework.boot
            spring-boot-starter-web
        

        
            org.springframework.boot
            spring-boot-starter-test
            test
        

        
        
            org.springframework.boot
            spring-boot-starter-amqp
        

        
        
            org.projectlombok
            lombok
        

        
        
            org.apache.commons
            commons-lang3
        

        
        
            com.alibaba
            fastjson
            1.2.62
        
    

    
        
            
                org.apache.maven.plugins
                maven-compiler-plugin
                
                    ${java.version}
                    ${java.version}
                    ${project.build.sourceEncoding}
                
            
        
    



rabbitmq01 子工程



    4.0.0

    
        com.yzm
        rabbitmq
        0.0.1-SNAPSHOT
        ../pom.xml 
    

    rabbitmq01
    0.0.1-SNAPSHOT
    jar
    rabbitmq01
    Demo project for Spring Boot

    

    

    
        
            
                org.springframework.boot
                spring-boot-maven-plugin
            
        
    


项目结构

application.yml

spring:
  rabbitmq:
    port: 5672
    host: 127.0.0.1
    username: guest
    password: guest
#    listener:
#      simple:
#        acknowledge-mode: manual
#        prefetch: 1
3 基本消息模型

开启定时器功能、创建队列hello-world

package com.yzm.rabbitmq01.config;

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;

@Configuration
@EnableScheduling
public class RabbitConfig {

    public static final String HELLO_WORLD = "hello-world";

    
    @Bean
    public Queue helloQueue() {
        return new Queue(HELLO_WORLD, true, false, false);
    }
    
}

生产者定时生产消息

package com.yzm.rabbitmq01.service;

import com.yzm.rabbitmq01.config.RabbitConfig;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;


@Component
public class HelloSenderService {

    private final AmqpTemplate template;
    private int count = 1;

    public HelloSenderService(AmqpTemplate template) {
        this.template = template;
    }

    // 项目启动后,过5秒开始第一个任务执行,之后每过1秒执行一次任务
    @Scheduled(fixedDelay = 1000, initialDelay = 5000)
    public void helloSend() {
        if (count <= 10) {
            String message = "Hello.........." + count++;
            template.convertAndSend(RabbitConfig.HELLO_WORLD, message);
            System.out.println(" [ 生产者 ] Sent ==> '" + message + "'");
        }
    }
}

消费者消费信息

package com.yzm.rabbitmq01.service;

import com.yzm.rabbitmq01.config.RabbitConfig;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Component
@RabbitListener(queues = RabbitConfig.HELLO_WORLD)
public class HelloReceiverService {

    @RabbitHandler
    public void helloReceive(String message) {
        System.out.println(" [ 消费者 ] Received ==> '" + message + "'");
    }
}

启动项目,等待5秒时间,定时器启动,开始生产消息,控制台打印
同时RabbitMQ服务器上能看到hello-world队列正在允许

其中:
Ready:表示待消费数量;队列中拥有可以被消费者消费的消息数量
Unacked:表示待确认数量;对于队列来说,它只知道消费者在消费消息,在消费者未回复它之前,是不知道消息被消费完了没,所以就给该消息一个待确认状态;
Total:表示待消费数和待确认数的总和
上面的示例,由于生产者生产完消息就立即被消费者消费了,很难看出这三个值的变化

4 消息确认机制(ack)

RabbitMQ默认的消息确认机制是:自动确认的 。
像上面的的示例,消费者只是消费了消息,并没有进行确认之类的 *** 作。
现在将消息确认改为:手动确认。
在application.yml中

spring:
  rabbitmq:
    port: 5672
    host: 127.0.0.1
    username: guest
    password: guest
    listener:
      simple:
        acknowledge-mode: manual # 开启手动确认,默认是auto
#        prefetch: 1

生产者、消费者不改,重启项目

消费者还是一样的消费消息(打印了),但服务器上显示的有10条消息未确认,因为消息是需要手动确认的,但我们消费者没确认。
如果我们停止项目,那么10条未确认的消息会回到Ready里面等待重新消费

不确认消息,那么消息会越来越多,再次重启项目

确认消息,修改消费者

package com.yzm.rabbitmq01.service;

import com.rabbitmq.client.Channel;
import com.yzm.rabbitmq01.config.RabbitConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;


@Component
//@RabbitListener(queues = RabbitConfig.HELLO_WORLD)
public class HelloReceiverService {

//    @RabbitHandler
//    public void helloReceive(String message) {
//        System.out.println(" [ 消费者 ] Received ==> '" + message + "'");
//    }

    
    @RabbitListener(queues = RabbitConfig.HELLO_WORLD)
    public void helloReceive(Message message, Channel channel) throws IOException {
        System.out.println(" [ 消费者 ] 接收到消息 ==> '" + new String(message.getBody()));

        // 确认消息
        // 第一个参数,交付标签,相当于消息ID 64位的长整数
        // 第二个参数,false表示仅确认提供的交付标签;true表示批量确认所有消息,包括提供的交付标签
        //channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

重启项目,前两次积累的消息先被消费完,接着生产的也被消费,服务器上的队列消息全被消费了

拒绝消息,修改消费者

	
    @RabbitListener(queues = RabbitConfig.HELLO_WORLD)
    public void helloReceive(Message message, Channel channel) throws IOException {
        System.out.println(" [ 消费者 ] 接收到消息 ==> '" + new String(message.getBody()));

        // 确认消息
        // 第一个参数,交付标签,相当于消息ID 64位的长整数
        // 第二个参数,false表示仅确认提供的交付标签;true表示批量确认所有消息,包括提供的交付标签
        //channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

        // 拒绝消息方式一
        // 第一个参数,交付标签
        // 第二个参数,false表示仅拒绝提供的交付标签;true表示批量拒绝所有消息,包括提供的交付标签
        // 第三个参数,false表示直接丢弃消息,true表示重新排队
        //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);

        // 拒绝消息方式二
        // 第一个参数,交付标签
        // 第二个参数,false表示直接丢弃消息,true表示重新排队
        // 跟basicNack的区别就是始终只拒绝提供的交付标签
        channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
    }

手动确认消息机制
未确认:什么也不用写,消息不会丢失,只会越来越多,重复消费
确认:确认后,消息从队列移除
拒绝:拒绝后,消息先从队列移除,然后可以选择重新排队,或者直接丢弃(丢弃还有一种选择,就是加入到死信队列中,用于追踪问题)

异常处理,修改消费者

package com.yzm.rabbitmq01.service;

import com.rabbitmq.client.Channel;
import com.yzm.rabbitmq01.config.RabbitConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;


@Component
//@RabbitListener(queues = RabbitConfig.HELLO_WORLD)
public class HelloReceiverService {

//    @RabbitHandler
//    public void helloReceive(String message) {
//        System.out.println(" [ 消费者 ] Received ==> '" + message + "'");
//    }

    
    
    
    @RabbitListener(queues = RabbitConfig.HELLO_WORLD)
    public void helloReceive(Message message, Channel channel) throws IOException {
        System.out.println(" [ 消费者 ] 接收到消息 ==> '" + new String(message.getBody()));
        // 制造异常
        int i = 1 / 0;
        System.out.println("成功处理了消息");
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

在手动确认之前,抛出异常

运行到异常的代码,抛出异常,下面的代码不再执行,这样就相当于未确认了
继续修改代码

@RabbitListener(queues = RabbitConfig.HELLO_WORLD)
    public void helloReceive(Message message, Channel channel) throws IOException {
        System.out.println(" [ 消费者 ] 接收到消息 ==> '" + new String(message.getBody()));
        try {
            // 制造异常
            int i = 1 / 0;
            System.out.println("成功处理了消息");
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            // 这里拒绝后,可以选择将异常消息发送到死信队列
            System.out.println("有异常情况,将异常消息发送到死信队列,请尽快处理");
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
        }
    }

这种才是实际开发中的正常处理逻辑

5 竞争消费者模式

创建队列work-queue

package com.yzm.rabbitmq01.config;

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;

@Configuration
@EnableScheduling
public class RabbitConfig {

    public static final String HELLO_WORLD = "hello-world";

    
    @Bean
    public Queue helloQueue() {
        return new Queue(HELLO_WORLD, true, false, false);
    }

    //------------------------------------------------------------------------------------------------------------------

    public static final String WORK_QUEUE = "work-queue";

    @Bean
    public Queue workQueue() {
        return new Queue(WORK_QUEUE);
    }
}

生产者

package com.yzm.rabbitmq01.service;

import com.yzm.rabbitmq01.config.RabbitConfig;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.scheduling.annotation.Scheduled;


@Component
public class WorkSenderService {

    private final AmqpTemplate template;
    private int count = 1;

    public WorkSenderService(AmqpTemplate template) {
        this.template = template;
    }

    @Scheduled(fixedDelay = 500, initialDelay = 10000)
    public void workSend() {
        if (count <= 30) {
            String message = "Hello.........." + count++;
            template.convertAndSend(RabbitConfig.WORK_QUEUE, message);
            System.out.println(" [ 生产者 ] Sent ==> '" + message + "'");
        }
    }
}

消费者

package com.yzm.rabbitmq01.service;

import com.yzm.rabbitmq01.config.RabbitConfig;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Component
public class WorkReceiverService {

    private int count = 1;
    private int count2 = 1;

    @RabbitListener(queues = RabbitConfig.WORK_QUEUE)
    public void workReceive(Message message, Channel channel) {
        try {
            System.out.println(" [ 消费者@1号 ] Received ==> '" + new String(message.getBody()) + "'");
            Thread.sleep(1000);
            System.out.println(" [ 消费者@1号 ] Dealt with:" + count++);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @RabbitListener(queues = RabbitConfig.WORK_QUEUE)
    public void workReceive2(Message message, Channel channel) {
        try {
            System.out.println(" [ 消费者@2号 ] Received ==> '" + new String(message.getBody()) + "'");
            Thread.sleep(2000);
            System.out.println(" [ 消费者@2号 ] Dealt with:" + count2++);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

为了不影响本次测试,关闭上次的生产者定时任务

重启项目,运行结果如下

我们有2个消费者1、2号,1号每秒消费一个消息,2号每两秒消费一个消息,也就是说1号处理能力是2号的2倍;但队列分配消息默认是平均分配的,这样就会导致有的消费者处理快了就有空闲时间,而我们想要尽快的处理掉消息,需要处理快的多处理一些。解决方法有两种:
第一种:设置prefetch参数

spring:
  rabbitmq:
    port: 5672
    host: 127.0.0.1
    username: guest
    password: guest
    listener:
      simple:
        acknowledge-mode: manual
        prefetch: 1 

重启项目,可以看到1号处理了20个,2号处理了10个

第二种:代码配置
修改配置类RabbitConfig

package com.yzm.rabbitmq01.config;

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;

@Configuration
@EnableScheduling
public class RabbitConfig {

    public static final String HELLO_WORLD = "hello-world";

    
    @Bean
    public Queue helloQueue() {
        return new Queue(HELLO_WORLD, true, false, false);
    }

    //------------------------------------------------------------------------------------------------------------------

    public static final String WORK_QUEUE = "work-queue";
    public static final String PREFETCH_ONE = "prefetchOne";

    @Bean
    public Queue workQueue() {
        return new Queue(WORK_QUEUE);
    }

    @Bean(name = PREFETCH_ONE)
    public RabbitListenerContainerFactory prefetchOne(ConnectionFactory rabbitConnectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(rabbitConnectionFactory);
        factory.setPrefetchCount(1);
        return factory;
    }
}

修改消费者

	@RabbitListener(queues = RabbitConfig.WORK_QUEUE, containerFactory = RabbitConfig.PREFETCH_ONE)
    public void workReceive(Message message) {
        try {
            System.out.println(" [ 消费者@1号 ] Received ==> '" + new String(message.getBody()) + "'");
            Thread.sleep(1000);
            System.out.println(" [ 消费者@1号 ] Dealt with:" + count++);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    @RabbitListener(queues = RabbitConfig.WORK_QUEUE, containerFactory = RabbitConfig.PREFETCH_ONE)
    public void workReceive2(Message message) {
        try {
            System.out.println(" [ 消费者@2号 ] Received ==> '" + new String(message.getBody()) + "'");
            Thread.sleep(2000);
            System.out.println(" [ 消费者@2号 ] Dealt with:" + count2++);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

注释掉第一种方法的 prefetch 参数,并开启自动确认机制(手动确认会报channel close错误)

重启项目,运行结果跟第一种是一样的

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5069221.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-16
下一篇 2022-11-16

发表评论

登录后才能评论

评论列表(0条)

保存