1 github:源码地址 2 环境-父工程(管理依赖版本)
4.0.0 org.springframework.boot spring-boot-starter-parent2.5.0 com.yzm rabbitmq0.0.1-SNAPSHOT pom rabbitmq Demo project for Spring Boot rabbitmq01 UTF-8 UTF-8 1.8 org.springframework.boot spring-boot-starter-weborg.springframework.boot spring-boot-starter-testtest org.springframework.boot spring-boot-starter-amqporg.projectlombok lombokorg.apache.commons commons-lang3com.alibaba fastjson1.2.62 org.apache.maven.plugins maven-compiler-plugin${java.version} ${project.build.sourceEncoding}
rabbitmq01 子工程
4.0.0 rabbitmq01 com.yzm rabbitmq0.0.1-SNAPSHOT ../pom.xml 0.0.1-SNAPSHOT jar rabbitmq01 Demo project for Spring Boot org.springframework.boot spring-boot-maven-plugin
项目结构
application.yml
spring: rabbitmq: port: 5672 host: 127.0.0.1 username: guest password: guest # listener: # simple: # acknowledge-mode: manual # prefetch: 13 基本消息模型
开启定时器功能、创建队列hello-world
package com.yzm.rabbitmq01.config; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableScheduling; @Configuration @EnableScheduling public class RabbitConfig { public static final String HELLO_WORLD = "hello-world"; @Bean public Queue helloQueue() { return new Queue(HELLO_WORLD, true, false, false); } }
生产者定时生产消息
package com.yzm.rabbitmq01.service; import com.yzm.rabbitmq01.config.RabbitConfig; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; @Component public class HelloSenderService { private final AmqpTemplate template; private int count = 1; public HelloSenderService(AmqpTemplate template) { this.template = template; } // 项目启动后,过5秒开始第一个任务执行,之后每过1秒执行一次任务 @Scheduled(fixedDelay = 1000, initialDelay = 5000) public void helloSend() { if (count <= 10) { String message = "Hello.........." + count++; template.convertAndSend(RabbitConfig.HELLO_WORLD, message); System.out.println(" [ 生产者 ] Sent ==> '" + message + "'"); } } }
消费者消费信息
package com.yzm.rabbitmq01.service; import com.yzm.rabbitmq01.config.RabbitConfig; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queues = RabbitConfig.HELLO_WORLD) public class HelloReceiverService { @RabbitHandler public void helloReceive(String message) { System.out.println(" [ 消费者 ] Received ==> '" + message + "'"); } }
4 消息确认机制(ack)启动项目,等待5秒时间,定时器启动,开始生产消息,控制台打印
同时RabbitMQ服务器上能看到hello-world队列正在允许
其中:
Ready:表示待消费数量;队列中拥有可以被消费者消费的消息数量
Unacked:表示待确认数量;对于队列来说,它只知道消费者在消费消息,在消费者未回复它之前,是不知道消息被消费完了没,所以就给该消息一个待确认状态;
Total:表示待消费数和待确认数的总和
上面的示例,由于生产者生产完消息就立即被消费者消费了,很难看出这三个值的变化
RabbitMQ默认的消息确认机制是:自动确认的 。
像上面的的示例,消费者只是消费了消息,并没有进行确认之类的 *** 作。
现在将消息确认改为:手动确认。
在application.yml中
spring: rabbitmq: port: 5672 host: 127.0.0.1 username: guest password: guest listener: simple: acknowledge-mode: manual # 开启手动确认,默认是auto # prefetch: 1
生产者、消费者不改,重启项目
消费者还是一样的消费消息(打印了),但服务器上显示的有10条消息未确认,因为消息是需要手动确认的,但我们消费者没确认。
如果我们停止项目,那么10条未确认的消息会回到Ready里面等待重新消费
不确认消息,那么消息会越来越多,再次重启项目
确认消息,修改消费者
package com.yzm.rabbitmq01.service; import com.rabbitmq.client.Channel; import com.yzm.rabbitmq01.config.RabbitConfig; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; @Component //@RabbitListener(queues = RabbitConfig.HELLO_WORLD) public class HelloReceiverService { // @RabbitHandler // public void helloReceive(String message) { // System.out.println(" [ 消费者 ] Received ==> '" + message + "'"); // } @RabbitListener(queues = RabbitConfig.HELLO_WORLD) public void helloReceive(Message message, Channel channel) throws IOException { System.out.println(" [ 消费者 ] 接收到消息 ==> '" + new String(message.getBody())); // 确认消息 // 第一个参数,交付标签,相当于消息ID 64位的长整数 // 第二个参数,false表示仅确认提供的交付标签;true表示批量确认所有消息,包括提供的交付标签 //channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }
重启项目,前两次积累的消息先被消费完,接着生产的也被消费,服务器上的队列消息全被消费了
拒绝消息,修改消费者
@RabbitListener(queues = RabbitConfig.HELLO_WORLD) public void helloReceive(Message message, Channel channel) throws IOException { System.out.println(" [ 消费者 ] 接收到消息 ==> '" + new String(message.getBody())); // 确认消息 // 第一个参数,交付标签,相当于消息ID 64位的长整数 // 第二个参数,false表示仅确认提供的交付标签;true表示批量确认所有消息,包括提供的交付标签 //channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息方式一 // 第一个参数,交付标签 // 第二个参数,false表示仅拒绝提供的交付标签;true表示批量拒绝所有消息,包括提供的交付标签 // 第三个参数,false表示直接丢弃消息,true表示重新排队 //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); // 拒绝消息方式二 // 第一个参数,交付标签 // 第二个参数,false表示直接丢弃消息,true表示重新排队 // 跟basicNack的区别就是始终只拒绝提供的交付标签 channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); }
手动确认消息机制
未确认:什么也不用写,消息不会丢失,只会越来越多,重复消费
确认:确认后,消息从队列移除
拒绝:拒绝后,消息先从队列移除,然后可以选择重新排队,或者直接丢弃(丢弃还有一种选择,就是加入到死信队列中,用于追踪问题)
异常处理,修改消费者
package com.yzm.rabbitmq01.service; import com.rabbitmq.client.Channel; import com.yzm.rabbitmq01.config.RabbitConfig; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; @Component //@RabbitListener(queues = RabbitConfig.HELLO_WORLD) public class HelloReceiverService { // @RabbitHandler // public void helloReceive(String message) { // System.out.println(" [ 消费者 ] Received ==> '" + message + "'"); // } @RabbitListener(queues = RabbitConfig.HELLO_WORLD) public void helloReceive(Message message, Channel channel) throws IOException { System.out.println(" [ 消费者 ] 接收到消息 ==> '" + new String(message.getBody())); // 制造异常 int i = 1 / 0; System.out.println("成功处理了消息"); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }
在手动确认之前,抛出异常
运行到异常的代码,抛出异常,下面的代码不再执行,这样就相当于未确认了
继续修改代码
@RabbitListener(queues = RabbitConfig.HELLO_WORLD) public void helloReceive(Message message, Channel channel) throws IOException { System.out.println(" [ 消费者 ] 接收到消息 ==> '" + new String(message.getBody())); try { // 制造异常 int i = 1 / 0; System.out.println("成功处理了消息"); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { // 这里拒绝后,可以选择将异常消息发送到死信队列 System.out.println("有异常情况,将异常消息发送到死信队列,请尽快处理"); channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); } }
5 竞争消费者模式这种才是实际开发中的正常处理逻辑
创建队列work-queue
package com.yzm.rabbitmq01.config; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableScheduling; @Configuration @EnableScheduling public class RabbitConfig { public static final String HELLO_WORLD = "hello-world"; @Bean public Queue helloQueue() { return new Queue(HELLO_WORLD, true, false, false); } //------------------------------------------------------------------------------------------------------------------ public static final String WORK_QUEUE = "work-queue"; @Bean public Queue workQueue() { return new Queue(WORK_QUEUE); } }
生产者
package com.yzm.rabbitmq01.service; import com.yzm.rabbitmq01.config.RabbitConfig; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.scheduling.annotation.Scheduled; @Component public class WorkSenderService { private final AmqpTemplate template; private int count = 1; public WorkSenderService(AmqpTemplate template) { this.template = template; } @Scheduled(fixedDelay = 500, initialDelay = 10000) public void workSend() { if (count <= 30) { String message = "Hello.........." + count++; template.convertAndSend(RabbitConfig.WORK_QUEUE, message); System.out.println(" [ 生产者 ] Sent ==> '" + message + "'"); } } }
消费者
package com.yzm.rabbitmq01.service; import com.yzm.rabbitmq01.config.RabbitConfig; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class WorkReceiverService { private int count = 1; private int count2 = 1; @RabbitListener(queues = RabbitConfig.WORK_QUEUE) public void workReceive(Message message, Channel channel) { try { System.out.println(" [ 消费者@1号 ] Received ==> '" + new String(message.getBody()) + "'"); Thread.sleep(1000); System.out.println(" [ 消费者@1号 ] Dealt with:" + count++); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { e.printStackTrace(); } } @RabbitListener(queues = RabbitConfig.WORK_QUEUE) public void workReceive2(Message message, Channel channel) { try { System.out.println(" [ 消费者@2号 ] Received ==> '" + new String(message.getBody()) + "'"); Thread.sleep(2000); System.out.println(" [ 消费者@2号 ] Dealt with:" + count2++); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { e.printStackTrace(); } } }
为了不影响本次测试,关闭上次的生产者定时任务
重启项目,运行结果如下
我们有2个消费者1、2号,1号每秒消费一个消息,2号每两秒消费一个消息,也就是说1号处理能力是2号的2倍;但队列分配消息默认是平均分配的,这样就会导致有的消费者处理快了就有空闲时间,而我们想要尽快的处理掉消息,需要处理快的多处理一些。解决方法有两种:
第一种:设置prefetch参数
spring: rabbitmq: port: 5672 host: 127.0.0.1 username: guest password: guest listener: simple: acknowledge-mode: manual prefetch: 1
重启项目,可以看到1号处理了20个,2号处理了10个
第二种:代码配置
修改配置类RabbitConfig
package com.yzm.rabbitmq01.config; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableScheduling; @Configuration @EnableScheduling public class RabbitConfig { public static final String HELLO_WORLD = "hello-world"; @Bean public Queue helloQueue() { return new Queue(HELLO_WORLD, true, false, false); } //------------------------------------------------------------------------------------------------------------------ public static final String WORK_QUEUE = "work-queue"; public static final String PREFETCH_ONE = "prefetchOne"; @Bean public Queue workQueue() { return new Queue(WORK_QUEUE); } @Bean(name = PREFETCH_ONE) public RabbitListenerContainerFactoryprefetchOne(ConnectionFactory rabbitConnectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(rabbitConnectionFactory); factory.setPrefetchCount(1); return factory; } }
修改消费者
@RabbitListener(queues = RabbitConfig.WORK_QUEUE, containerFactory = RabbitConfig.PREFETCH_ONE) public void workReceive(Message message) { try { System.out.println(" [ 消费者@1号 ] Received ==> '" + new String(message.getBody()) + "'"); Thread.sleep(1000); System.out.println(" [ 消费者@1号 ] Dealt with:" + count++); } catch (Exception e) { e.printStackTrace(); } } @RabbitListener(queues = RabbitConfig.WORK_QUEUE, containerFactory = RabbitConfig.PREFETCH_ONE) public void workReceive2(Message message) { try { System.out.println(" [ 消费者@2号 ] Received ==> '" + new String(message.getBody()) + "'"); Thread.sleep(2000); System.out.println(" [ 消费者@2号 ] Dealt with:" + count2++); } catch (Exception e) { e.printStackTrace(); } }
注释掉第一种方法的 prefetch 参数,并开启自动确认机制(手动确认会报channel close错误)
重启项目,运行结果跟第一种是一样的
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)