目录
1.如何保证消息中间件的可靠性投递
2.关于生产者到exchange交换机的可靠性处理
3.关于交换机分配队列的可靠性处理
4.关于队列分配消费者的可靠性处理
5.消费端限流问题
1.如何保证消息中间件的可靠性投递
首先,我们来看一下,rabbitMQ的具体流程
生产者将消息投递到交换机中,由交换机路由转发队列,消费者再从队列中抽取消息,在实际生产环境中,我们要保证每次生产者的消息都要有消费者去处理,这就涉及到关于消息中间件的可靠性问题,大致分为以下几个部分,在rabbitMQ中,有相对的处理方式来保证每个环节的消息可靠性
1.由生产者到exchange交换机
2.由交换机分配到队列。
3.队列排列消息不作丢失
4.由队列分配至消费者处理
2.关于生产者到exchange交换机的可靠性处理关于机制:/confirm/i确认机制
在使用RabbitMQ的时候,作为消息的发送方希望杜绝任何消息丢失或者投递失败的场景,RabbitMQ为我们提供了两种方式来控制消息的可靠性投递。
/confirm/i确认模式
return退回模式
在默认使用rabbitMQ中,默认关闭了/confirm/i模式,所以在使用确认机制的时候,第一步我们要开启确认模式(yml配置文件)
spring: rabbitmq: #开启确认模式 publisher-/confirm/i-type: correlated
然后即可开始设置/confirm/i回调函数
//设置开启/confirm/i模式 //为rabbitTemplate设置/confirm/iCallback回调函数 public void testConfirm(){ //为rabbitTemplate设置/confirm/i回调函数 rabbitTemplate.setConfirmCallback(new ConfirmCallback(){ //b:如果从生产者发送消息到交换机成果返回true,失败返回false //s:发送失败的原因 @Override public void confirm(CorrelationData correlationData,boolean b ,String s){ if(b){ System.out.println("此部分发送消息成功"); }else{ System.out.println("此部分发送消息失败并打印原因"+s); } } }); }3.关于交换机分配队列的可靠性处理
关于机制:returning返回机制
通过设置ConnectionFactory的退回模式,当消息从exchange路由到queue失败后执行回调函数returnedMessage。
同样,首先设置退回模式的开启
spring: rabbitmq: #开启returns机制 publisher-returns: ture
然后即可开始设置returning的回调函数
public void testReturning(){ rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback(){ //注:此方法只有交换机发送到队列失败时才会执行 @Override public void returndeMessage(ReturnedMessage returnedMessage){ //可以通过returnedMessage.getxxx获得消息内容或发送失败原因等等 //获得消息具体内容 returnedMessage.getMessage().getBody(); System.out.println("此部分发送消息失败"); } }); }4.关于队列分配消费者的可靠性处理
关于机制:ack手动确认机制,即为消费者手动确认此消息。
老样子,第一步更改消费者的确认机制
spring: rabbitmq: listener: simple: #消费者手动确认消息 acknowledge-mode: manual
将目光移至消费者的消息处理部分
@RabbitListener(queues={"xxx","xxx","xxx"})//监听的队列名 public void handler(Message msg,Channel channel){ System.out.println("处理消息模块"); //此时手动确认该消息--->队列就会把消息从队列中移除 //long deliveryTag, 消息标识 //boolean multiple,是否允许多次确认 //channel.basicAck(long deliveryTag,boolean multiple);//信道手动确认 long deliveryTag = msg.getMessageProperties().getDeliveryTag();//获得消息标识 channel.basicAck(deliveryTag,true);//此处有异常,建议进行try catch处理 }
如果在手动状态下,不进行basicAck手动确认,那么在队列中会存在一条Unacked状态的消息。当然。如果程序出了异常,我们可以把这条消息退换给队列。方法如下
//boolean requeue, 是否把消息退换给队列 channel.basicNack(long deliveryTag,boolean multiple,boolean requeue)
你可以测试一下,在消费者中造个除零异常,可以体验一下拿到消息退回队列的死循环刷屏,当然,正常情况下不会出现这种情况,一般的退回原因都是因为网络或者其他问题。导致退回并重新处理该消息。
5.消费端限流问题假设A系统每秒最大处理1000条请求,但是由于请求量太大,为了避免A系统从队列中拉取超出自身处理能力都情况出现,通常会采用消费端限流降低性能来换取稳定性,使得A系统分批次处理消息请求,从而设计到消费端限流问题
限流前提:必须消费者是手动确认模式。
开启限流模式:
spring: rabbitmq: listener: simple: #消费者手动确认消息 acknowledge-mode: manual #设置消费者每次从队列中拉去消息的个数 prefetch: x
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)