Broker:接收和分发消息的应用,RabbitMQ Server 就是 Message Broker
Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个 vhost,每个用户在自己的 vhost 创建 exchange/queue 等
Connection:publisher/consumer 和 broker 之间的 TCP 连接
Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的Connection 极大减少了 *** 作系统建立 TCP connection 的开销
Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
Queue:消息最终被送到这里等待 consumer 取走
Binding:exchange 和queue 之间的虚拟连接,binding 中可以包含 routing key,Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据:接收和分发消息的应用,RabbitMQ Server 就是 Message Broker
1、rabbitmq如何保证消息可靠首先我们知道一个完整的结构涉及到生产者,mq,消费者这三部分,mq解除了系统之间的耦合,但是会出现一些问题,比如现在是三部分,它们都是独立的,每一部分都会存在消息丢失的情况,所以要从这三部分一起解决此问题。如下图。
1.1 mq保证消息不丢失对交换机,队列,消息进行持久化,持久化之后mq断电重启,消息还会被继续消费
1.1.1交换机、队列持久化@Configuration public class MQConfig { //队列 public final static String /confirm/i_QUEUE = "/confirm/i1_queue"; //交换机 public final static String /confirm/i_EXCHANGE = "/confirm/i1_exchange"; //routingKey public final static String ROUTINGKEY = "routing_key"; @Bean public Queue /confirm/iQueue() { return new Queue(/confirm/i_QUEUE, true, false, false); } @Bean DirectExchange /confirm/iExchange() { return new DirectExchange(MQConfig./confirm/i_EXCHANGE); } @Bean Binding bindingDirectExchangeCA() { return BindingBuilder.bind(/confirm/iQueue()).to(/confirm/iExchange()).with(ROUTINGKEY); } }1.1.2 消息持久化
springboot集成的rabbitmq的持久化,其实默认就实现了,且看源码:
@Override public void convertAndSend(String exchange, String routingKey, final Object object) throws AmqpException { convertAndSend(exchange, routingKey, object, (CorrelationData) null); } //点进去 @Override public void convertAndSend(String exchange, String routingKey, final Object object, @Nullable CorrelationData correlationData) throws AmqpException { send(exchange, routingKey, convertMessageIfNecessary(object), correlationData); } protected Message convertMessageIfNecessary(final Object object) { if (object instanceof Message) { return (Message) object; } return getRequiredMessageConverter().toMessage(object, new MessageProperties()); }
此时发现消息转换的时候,传入了一个MessageProperties对象
public MessageProperties() { this.deliveryMode = DEFAULT_DELIVERY_MODE; this.priority = DEFAULT_PRIORITY; } public MessageProperties() { this.deliveryMode = DEFAULT_DELIVERY_MODE; this.priority = DEFAULT_PRIORITY; } static { //持久化 DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT; DEFAULT_PRIORITY = 0; }
从 DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT;看出springboot默认已经对消息进行了持久化
1.2 消费者消息不丢失通过手动ack机制,当消费者成功将消息消费成功后,返回消息给mq,告诉mq你发送的消息我已经消费成功了,mq将队列中的消息进行删除
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
Channel.basicAck(用于肯定确认)
RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了
yaml
rabbitmq: template: mandatory: true #指定消息在没有被队列接收时是否强行退回还是直接丢弃 listener: simple: retry: ####开启消费者异常重试 enabled: true ####最大重试次数 max-attempts: 5 ####重试间隔次数 initial-interval: 5000 ####开启手动ack acknowledge-mode: manual1.3 生产者消息不丢失
发布确认机制,生产者通过回调可以得知发送的消息是否发送到交换机
yml文件:
publisher-/confirm/i-type: correlated # 发布消息成功到交换器后会触发回调方法 publisher-returns: true #publisher-return模式可以在消息没有被路由到指定的queue时将消息返回,而不是丢弃 template: mandatory: true #指定消息在没有被队列接收时是否强行退回还是直接丢弃
相关业务代码:
@Configuration @Slf4j public class RabbitConfigStudy implements RabbitTemplate./confirm/iCallback, RabbitTemplate.ReturnCallback { @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.set/confirm/iCallback(this); rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnCallback(this); return rabbitTemplate; } @Override public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) { String id = correlationData != null ? correlationData.getId() : ""; if (ack) { log.info("交换机已经收到 id 为:{}的消息", id); } else { log.info("交换机还未收到 id 为:{}消息,由于原因:{}", id, cause); } } @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.error(" 消 息 {}, 被 交 换 机 {} 退 回 , 退 回 原 因 :{}, 路 由 key:{}", new String(message.getBody()), exchange, replyText, routingKey); // spring_returned_message_correlation:该属性是指退回待确认消息的唯一标识 ,也就是id,可以作为更新表 System.out.println("消息:" + message.getMessageProperties().getHeader("spring_returned_message_correlation").toString()); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)