提示:写完文章后,目录可以自动生成,如何生成可参考右边的帮助文档
- 前言
- 一、发布者确认
- 二、mq消息的持久化
- 三、消费者确定消费了消息
- 四、没消费到消息咋办
- 总结
前言
消息中间件在很多场景被广泛使用,伴随着众多的使用,在实际项目中会为我们的数据带来极大的不严谨,所以今天耗费我的LOL时间跟大家分享下我的理解,愿助君一臂之力,如果错误的地方,还望不吝指正
核心思路围绕:
1、发布者确认发送了消息
2、发送到mq的消息持久化
3、消费者确认消费了消息
4、没消费到消息咋办
一、发布者确认 发布者确认网络可能会以不太明显的方式发生故障,并且检测某些故障需要时间。因此,将协议帧或一组帧(例如已发布的消息)写入其套接字的客户端不能假定消息已到达服务器并已成功处理。它可能在途中丢失,或者其交付可能会大大延迟。
使用标准的AMQP 0-9-1,保证消息不会丢失的唯一方法是使用事务 - 使通道事务化,然后为每个消息或消息集发布,提交。在这种情况下,事务是不必要的重量级事务,并且会将吞吐量降低 250 倍。为了解决这个问题,引入了一种确认机制。它模仿协议中已经存在的消费者确认机制。
若要启用 confirms,客户端将发送 confirm.select 方法。根据是否设置了“无等待”,代理可能会使用 confirm.select-ok 进行响应。一旦在通道上使用 confirm.select 方法,就说它处于确认模式。事务通道不能进入确认模式,一旦通道进入确认模式,就不能将其设置为事务性通道。
一旦通道处于确认模式,代理和客户端都会对消息进行计数(计数从第一个 confirm.select 的 1 开始)。然后,代理通过在同一通道上发送 basic.ack 来确认消息,因为它在处理消息时会确认消息。传递标记字段包含已确认邮件的序列号。代理还可以在 basic.ack 中设置多个字段,以指示所有消息(包括具有序列号的消息)都已处理。
对已发布的负面确认在异常情况下,当代理无法成功处理消息时,代理将发送 basic.nack,而不是 basic.ack。在此上下文中,basic.nack 的字段与 basic.ack 中的相应字段具有相同的含义,应忽略重新排队字段。通过确认一条或多条消息,代理表明它无法处理消息并拒绝对它们负责;此时,客户端可以选择重新发布消息。
将频道置于确认模式后,所有随后发布的消息将被确认或取消一次。不保证消息确认的时间。不会同时确认和确认任何消息。
basic.nack 只有在负责队列的 Erlang 进程中发生内部错误时才会传递。
1.Callback确保消息发送到交换机
@Test
public void EDG(){
String str = "一轮明月正在冉冉升起";
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
correlationData.getFuture().addCallback(new SuccessCallback() {
@Override
public void onSuccess(CorrelationData.Confirm result) {
if (result.isAck()) {
System.out.println("当前" + correlationData.getId() + "消息发送成功");
} else {
System.out.println("当前" + correlationData.getId() + "消息发送失败" + "失败原因" + result.getReason());
}
}
}, new FailureCallback() {
@Override
public void onFailure(Throwable ex) {
throw new RuntimeException(ex);
}
});
rabbitTemplate.convertAndSend("vv-exchange","vv-routingkey",str,correlationData);
System.out.println("消息发送成功");
}
开始测试:
2.retrunCallback
交换机路由到队列,这里spring创建bean的时候留给了我们自定义的方法,其中有一个接口:ReturnCallback,
还记得之前spring创建bean的流程,实例化bean--->填充属性---->初始化
spring对内置bean属性填充使用Aware,而我们知道了自己的bean名称RabbitTemplate,那简单了直接去spring ioc容器中拿到这个bean,然后定制我想要的效果
二、mq的消息持久化
这里是spring比较贴心,已经帮我们默认消息的持久化
三、消费者确认消费了消息
spring又很贴心,他提供了几个选项给我们
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: auto
四、没消费到消息咋办
消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力:
我们需要在yaml文件中配置失败重试机制
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000
multiplier: 1
max-attempts: 3
stateless: true
这里重试3次之后mq还是会删除掉,我们还是要考虑重试之后,该消息还没被消费咋办!
amqp为我们准备了消息回收器接口!一共三个感兴趣的话跟进去看下
package cn.vv.mq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author 威威财神
* @date 2022/4/24 周日
*/
@Configuration
public class TheSpareTireConfig {
@Bean
public Queue theSpareTireQueue(){
return new Queue("theSpareTire.queue", true);
}
@Bean
public DirectExchange directExchange(){
return new DirectExchange("theSpareTire.exchange",true,false);
}
public Binding binding(){
return BindingBuilder.bind(theSpareTireQueue()).to(directExchange()).with("theSpareTire");
}
/**
* 重试失败后就将失败消息投递到指定的交换机
*
* @return {@link RepublishMessageRecoverer }
* @Author 威威财神
* @Date 2022/4/24
*/
public RepublishMessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate,"theSpareTire.exchange","theSpareTire");
}
}
这里我使用RepublishMessageRecoverer,重试失败后就将失败消息投递到指定的交换机
总结-
开启生产者确认机制,确保生产者的消息能到达队列
-
开启持久化功能,确保消息未消费前在队列中不会丢失
-
开启消费者确认机制为auto,由spring确认消息处理成功后完成ack
-
开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)