【rabbitmq 】如何保证消息可靠性

【rabbitmq 】如何保证消息可靠性,第1张

【rabbitmq 】如何保证消息可靠性 概述


Broker:接收和分发消息的应用,RabbitMQ Server 就是 Message Broker

Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个 vhost,每个用户在自己的 vhost 创建 exchange/queue 等

Connection:publisher/consumer 和 broker 之间的 TCP 连接

Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的Connection 极大减少了 *** 作系统建立 TCP connection 的开销

Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)

Queue:消息最终被送到这里等待 consumer 取走

Binding:exchange 和queue 之间的虚拟连接,binding 中可以包含 routing key,Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据:接收和分发消息的应用,RabbitMQ Server 就是 Message Broker

1、rabbitmq如何保证消息可靠

首先我们知道一个完整的结构涉及到生产者,mq,消费者这三部分,mq解除了系统之间的耦合,但是会出现一些问题,比如现在是三部分,它们都是独立的,每一部分都会存在消息丢失的情况,所以要从这三部分一起解决此问题。如下图。

1.1 mq保证消息不丢失

交换机队列,消息进行持久化,持久化之后mq断电重启,消息还会被继续消费

1.1.1交换机、队列持久化
@Configuration
public class MQConfig {
    //队列
    public  final static String /confirm/i_QUEUE = "/confirm/i1_queue";

    //交换机
    public  final static String /confirm/i_EXCHANGE = "/confirm/i1_exchange";

    //routingKey
    public  final static String ROUTINGKEY = "routing_key";

    
    @Bean
    public Queue /confirm/iQueue() {
        return new Queue(/confirm/i_QUEUE, true, false, false);
    }

    
    @Bean
    DirectExchange /confirm/iExchange() {
        return new DirectExchange(MQConfig./confirm/i_EXCHANGE);
    }

    
    @Bean
    Binding bindingDirectExchangeCA() {
        return BindingBuilder.bind(/confirm/iQueue()).to(/confirm/iExchange()).with(ROUTINGKEY);
    }
}
1.1.2 消息持久化

springboot集成的rabbitmq的持久化,其实默认就实现了,且看源码:

@Override
    public void convertAndSend(String exchange, String routingKey, final Object object) throws AmqpException {
        convertAndSend(exchange, routingKey, object, (CorrelationData) null);
    }
  //点进去
 	@Override
	public void convertAndSend(String exchange, String routingKey, final Object object,
@Nullable CorrelationData correlationData) throws AmqpException {

		send(exchange, routingKey, convertMessageIfNecessary(object), correlationData);
	}
 
	protected Message convertMessageIfNecessary(final Object object) {
		if (object instanceof Message) {
			return (Message) object;
		}
		return getRequiredMessageConverter().toMessage(object, new MessageProperties());
	}

此时发现消息转换的时候,传入了一个MessageProperties对象

    public MessageProperties() {
        this.deliveryMode = DEFAULT_DELIVERY_MODE;
        this.priority = DEFAULT_PRIORITY;
    }
    
   public MessageProperties() {
        this.deliveryMode = DEFAULT_DELIVERY_MODE;
        this.priority = DEFAULT_PRIORITY;
    }
    
    static {
    //持久化
        DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT;
        DEFAULT_PRIORITY = 0;
    }

从 DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT;看出springboot默认已经对消息进行了持久化

1.2 消费者消息不丢失

通过手动ack机制,当消费者成功将消息消费成功后,返回消息给mq,告诉mq你发送的消息我已经消费成功了,mq将队列中的消息进行删除

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

Channel.basicAck(用于肯定确认)
RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了

yaml

rabbitmq:
    template:
      mandatory: true #指定消息在没有被队列接收时是否强行退回还是直接丢弃
    listener:
      simple:
        retry:
          ####开启消费者异常重试
          enabled: true
          ####最大重试次数
          max-attempts: 5
          ####重试间隔次数
          initial-interval: 5000
        ####开启手动ack
        acknowledge-mode: manual
1.3 生产者消息不丢失

发布确认机制,生产者通过回调可以得知发送的消息是否发送到交换机

yml文件:

 publisher-/confirm/i-type: correlated  # 发布消息成功到交换器后会触发回调方法
 publisher-returns: true #publisher-return模式可以在消息没有被路由到指定的queue时将消息返回,而不是丢弃
 template:
   mandatory: true #指定消息在没有被队列接收时是否强行退回还是直接丢弃

相关业务代码:

@Configuration
@Slf4j
public class RabbitConfigStudy implements RabbitTemplate./confirm/iCallback, RabbitTemplate.ReturnCallback {

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.set/confirm/iCallback(this);
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback(this);
        return rabbitTemplate;
    }

    
    @Override
    public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData != null ? correlationData.getId() : "";
        if (ack) {
            log.info("交换机已经收到 id 为:{}的消息", id);
        } else {
            log.info("交换机还未收到 id 为:{}消息,由于原因:{}", id, cause);
        }
    }

    
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.error(" 消 息 {}, 被 交 换 机 {} 退 回 , 退 回 原 因 :{}, 路 由 key:{}", new
                String(message.getBody()), exchange, replyText, routingKey);
        // spring_returned_message_correlation:该属性是指退回待确认消息的唯一标识 ,也就是id,可以作为更新表
        System.out.println("消息:" + message.getMessageProperties().getHeader("spring_returned_message_correlation").toString());
    }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5700977.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存