应用解耦:提高系统容错性和可维护性
异步提速:提升用户体验和系统吞吐量
削峰填谷:提高系统稳定性
2.劣势系统可用性降低
系统引入的外部依赖越多,系统稳定性越差。一但MQ宕机,就会对业务造成影响。
- 如何保证MQ的高可用? 集群方式
系统的复杂度提高
MQ的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过MQ进行异步调用。
- 如何保证消息没有被重复消费?ACK的消息确认机制
- 怎么处理消息丢失情况? ACK的消息确认机制
- 如何保证消息传递的顺序性? 可以迂回处理,多个消费者揉成一个。采用轮询机制处理
ACK的消息确认机制。
答:ACK机制是消费者从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到反馈后才将此消息从队列中删除。
一致性问题
A系统处理完业务,通过MQ给B、C、D三个系统发消息数据。如果B、C系统处理成功,D系统处理失败。
- 如何保证消息数据处理的一致性?
一个生产者,一个消费者
二、 Work模式工作队列模式,采用默认的交换机,路由名称为队列名称,有多个终端消费同一个队列的时候,交换机采用轮询发送消息,通俗点说就是给第一个发一条,另外一个发下一条
应用场景:
对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
测试:
1、多个消费者。
2、生产者发送多个消息。
结果:
1、一条消息只会被一个消费者接收;
2、rabbit采用轮询的方式将消息是平均发送给消费者的;
3、消费者在处理完某条消息后,才会收到下一条消息。
发布订阅模式:
1、每个消费者监听自己的队列。
2、生产者将消息发给交换机,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息
应用场景:
用户通知,当用户充值成功或转账完成系统通知用户,通知方式有短信、邮件多种方法 。
流程:
1.生产者
声明fanout类型交换机。
声明两个队列并且绑定到此交换机,绑定时不需要指定routingkey
发送消息时不需要指定routingkey
2.消费者
交换机会将信息发布给每个监听本交换机的队列
路由模式:
1、每个消费者监听自己的队列,并且设置routingkey。
2、生产者将消息发给交换机,由交换机根据routingkey来转发消息到指定的队列。
流程:
1.生产者
声明DIRECT类型交换机。
声明两个队列并且绑定到此交换机,绑定时需要指定routingkey
发送消息时需要指定routingkey
2.消费者
消费者绑定队列的时候可以指定routingkey来只获取指定routingkey的消息
说明:
Routing模式要求队列在绑定交换机时要指定routingkey,消息会转发到符合routingkey的队列。
同路由模式相似,但是routingkey的匹配是通过通配符决定的,路由模式是相等才匹配
设置交换机类型为Topics即可
应用场景:
根据用户的通知设置去通知用户,设置接收Email的用户只接收Email,设置接收sms的用户只接收sms,设置两种
通知类型都接收的则两种通知都有效。
通配符使用:
[#],匹配一个或者多个词,比如 uncle.# ,可以匹配uncle.sms、uncle.email、uncle.sms.email
[* ],匹配一个词,比如 uncle.*,可以匹配uncle.sms、uncle.email
例子:
uncle.#.sms.# 能匹配 uncle.sms、uncle.email.sms,不能匹配 uncle.email
RPC即客户端远程调用服务端的方法 ,使用MQ可以实现RPC的异步调用,基于Direct交换机实现,流程如下:
1、客户端即是生产者就是消费者,向RPC请求队列发送RPC调用消息,同时监听RPC响应队列。
2、服务端监听RPC请求队列的消息,收到消息后执行服务端的方法,得到方法返回的结果
3、服务端将RPC方法 的结果发送到RPC响应队列
4、客户端(RPC调用方)监听RPC响应队列,接收到RPC调用结果。
当消息成为 Dead Message后,可以被重新发送到另一个交换机,这个交换机就是死信交换机DLX
消息成为死信的三种情况
-
队列消息长度到达限制
-
消费者拒接消费消息,并且不重回队列
-
原队列存在消息过期设置,消息到达超时时间未被消费
使用场景
-
可以使用TTL+死信队列 组合实现延迟队列的效果
-
可以死信队列中在设置死信队列
生产者 --> 消息 --> 交换机 --> 队列 --> 变成死信 --> DLX交换机 -->队列 --> 消费者
一、连接配置#容器类型.simple或direct spring.rabbitmq.listener.type=simple #该配置项是用来表示消息确认方式,其有三种配置方式,分别是none、manual和auto。 #none意味着没有任何的应答会被发送。 #manual意味着监听者必须通过调用Channel.basicAck()来告知所有的消息。 #auto意味着容器会自动应答,除非MessageListener抛出异常,这是默认配置方式。 spring.rabbitmq.listener.simple.acknowledge-mode=manual #该配置项是决定由于监听器抛出异常而拒绝的消息是否被重新放回队列。默认值为true。 #该属性配置为true表示会重新放回队列,如果配置为false表示不会放回队列。 spring.rabbitmq.listener.simple.default-requeue-rejected=false #NONE值是禁用发布确认模式,是默认值 #CORRELATED值是发布消息成功到交换器后会触发回调方法,如1示例 #SIMPLE值经测试有两种效果,其一效果和CORRELATED值一样会触发回调方法,其二在发布消息成功后使用rabbitTemplate调用waitFor/confirm/is或waitFor/confirm/isOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitFor/confirm/isOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker; spring.rabbitmq.publisher-/confirm/i-type=correlated #可以确保消息在未被队列接收时返回 spring.rabbitmq.publisher-returns=true #指定消息在没有被队列接收时是否强行退回还是直接丢弃 spring.rabbitmq.template.mandatory=true #一个消费者最多可处理的nack消息数量,如果有事务的话,必须大于等于transaction数量 #每个消费者可最大处理的nack消息数量 spring.rabbitmq.listener.direct.prefetch=1000二、生产者消息可靠性传递 1.确认模式
//1.开启确认模式 //2.在rabbitTemplate 定义ConfirmCallback 回调函数 rabbitTemplate.set/confirm/iCallback(new RabbitTemplate./confirm/iCallback() { @Overr @Override public void /confirm/i(CorrelationData correlationData, boolean b, String s) { } });2.回退模式
//回退模式:当消息发送给交换机后,交换机路由到queue失败时,才会执行,ReturnCallBack //步骤 //1.开启回退模式 //2.设置ReturnCallBack函数 //3.设置交换机处理消息的模式 // 1.如果消息没有路由到queue,则丢弃消息(默认) // 2.如果消息没有路由到queue,返回给消息发送方ReturnCallBack //设置交换机处理失败消息的模式 rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { @Override public void returnedMessage(ReturnedMessage returned) { } });三、消费者消息可靠性传递 1.自动签收 2.手动签收
//拒绝签收 //第二个 boolean 表示是否重回队列。为true,则消息重回queue,broker会重新发送该消息给消费者 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); //正常手动签收 //是否签收多条消息,true为是 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);四、TTL过期时间
如果都设置了,以时间短的生效
1.队列统一过期//声明队列时,配置该参数,单位为毫秒 args.put("x-message-ttl",100000);2.消息单独过期
单独过期的消息,只有在队列顶端才会判断其是否过期
//消息后处理对象,设置一些消息的参数信息 MessagePostProcessor messagePostProcessor=new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { //1.设置message的配置信息 //过期时间,毫秒单位 message.getMessageProperties().setExpiration("5000"); return message; } }; rabbitTemplate.convertSendAndReceive(BUSINESS_EXCHANGE_NAME, "", msg,messagePostProcessor);五、springboot生产者配置
1.声明业务交换机
2.声明死信交换机
3.声明死信队列
4.声明业务队列
- 设置当前队列的死信交换机
- 设置当前队列的死信路由KEY
5.绑定业务队列和交换机关系
6.绑定死信队列和死信交换机关系
// 声明业务Exchange @Bean("businessExchange") public FanoutExchange businessExchange(){ return new FanoutExchange(BUSINESS_EXCHANGE_NAME); } // 声明业务队列A @Bean("businessQueueA") public Queue businessQueueA(){ Map六、springboot 消费者配置args = new HashMap<>(2); // x-dead-letter-exchange 这里声明当前队列绑定的死信交换机 args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE); // x-dead-letter-routing-key 这里声明当前队列的死信路由key args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY); //设置队列过期时间 args.put("x-message-ttl",100000); //设置队列长度 10条消息 args.put("x-max-length",10); return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).withArguments(args).build(); } // 声明死信队列A @Bean("deadLetterQueueA") public Queue deadLetterQueueA(){ return new Queue(DEAD_LETTER_QUEUEA_NAME); } // 声明业务队列A绑定关系 @Bean public Binding businessBindingA(@Qualifier("businessQueueA") Queue queue, @Qualifier("businessExchange") FanoutExchange exchange){ return BindingBuilder.bind(queue).to(exchange); } // 声明死信队列A绑定关系 @Bean public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue, @Qualifier("deadLetterExchange") DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY); }
//设置监听队列 @RabbitListener(queues = BUSINESS_QUEUEA_NAME) public void receiveA(Message message, Channel channel) throws IOException { String msg = new String(message.getBody()); log.info("收到业务消息A:{}", msg); boolean ack = true; Exception exception = null; try { if (msg.contains("deadletter")){ throw new RuntimeException("dead letter exception"); } } catch (Exception e){ ack = false; exception = e; } if (!ack){ log.error("消息消费发生异常,error msg:{}", exception.getMessage(), exception); //拒绝签收 basicNack //第二个 boolean 表示是否重回队列。为true,则消息重回queue,broker会重新发送该消息给消费者 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); } else { //手动正常签收basicAck //是否签收多条消息,true为是 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }七、应用问题 一、消息补偿(可靠性)
- 消费者收到消息,处理完毕后,调用生产者的API
消费多条相同的消息,得到与消费该消息一次相同的结果
数据库中,使用乐观锁机制
liveryTag(), false, false);
} else {
//手动正常签收basicAck
//是否签收多条消息,true为是
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
# 七、应用问题 ## 一、消息补偿(可靠性) - 消费者收到消息,处理完毕后,调用生产者的API [外链图片转存中...(img-uevDGpwY-1634552488751)] ## 二、幂等性 消费多条相同的消息,得到与消费该消息一次相同的结果 数据库中,使用乐观锁机制
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)