消息中间件rabbitMQ的可靠性处理

消息中间件rabbitMQ的可靠性处理,第1张

消息中间件rabbitMQ的可靠性处理

目录

        1.如何保证消息中间件的可靠性投递

2.关于生产者到exchange交换机的可靠性处理

3.关于交换机分配队列的可靠性处理

4.关于队列分配消费者的可靠性处理

5.消费端限流问题


1.如何保证消息中间件的可靠性投递

首先,我们来看一下,rabbitMQ的具体流程

生产者将消息投递到交换机中,由交换机路由转发队列,消费者再从队列中抽取消息,在实际生产环境中,我们要保证每次生产者的消息都要有消费者去处理,这就涉及到关于消息中间件的可靠性问题,大致分为以下几个部分,在rabbitMQ中,有相对的处理方式来保证每个环节的消息可靠性

1.由生产者到exchange交换机

2.由交换机分配到队列。

3.队列排列消息不作丢失

4.由队列分配至消费者处理

2.关于生产者到exchange交换机的可靠性处理

关于机制:/confirm/i确认机制

在使用RabbitMQ的时候,作为消息的发送方希望杜绝任何消息丢失或者投递失败的场景,RabbitMQ为我们提供了两种方式来控制消息的可靠性投递。

/confirm/i确认模式

return退回模式

在默认使用rabbitMQ中,默认关闭了/confirm/i模式,所以在使用确认机制的时候,第一步我们要开启确认模式(yml配置文件)

spring:
  rabbitmq:
    #开启确认模式
    publisher-/confirm/i-type: correlated

然后即可开始设置/confirm/i回调函数

//设置开启/confirm/i模式
//为rabbitTemplate设置/confirm/iCallback回调函数
public void testConfirm(){
    //为rabbitTemplate设置/confirm/i回调函数
    rabbitTemplate.setConfirmCallback(new ConfirmCallback(){
      //b:如果从生产者发送消息到交换机成果返回true,失败返回false
      //s:发送失败的原因
      @Override
      public void confirm(CorrelationData correlationData,boolean b ,String s){
        if(b){
          System.out.println("此部分发送消息成功");
        }else{
          System.out.println("此部分发送消息失败并打印原因"+s);
        }
      }
    });
}

3.关于交换机分配队列的可靠性处理

关于机制:returning返回机制

通过设置ConnectionFactory的退回模式,当消息从exchange路由到queue失败后执行回调函数returnedMessage。

同样,首先设置退回模式的开启

spring:
  rabbitmq:
    #开启returns机制
    publisher-returns: ture

然后即可开始设置returning的回调函数

public void testReturning(){
  rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback(){
    //注:此方法只有交换机发送到队列失败时才会执行
    @Override
    public void returndeMessage(ReturnedMessage returnedMessage){
      //可以通过returnedMessage.getxxx获得消息内容或发送失败原因等等
      //获得消息具体内容
      returnedMessage.getMessage().getBody();
      System.out.println("此部分发送消息失败");
    }
  });
}

4.关于队列分配消费者的可靠性处理

关于机制:ack手动确认机制,即为消费者手动确认此消息。

老样子,第一步更改消费者的确认机制

spring:
  rabbitmq:
    listener:
      simple:
        #消费者手动确认消息
        acknowledge-mode: manual

将目光移至消费者的消息处理部分

@RabbitListener(queues={"xxx","xxx","xxx"})//监听的队列名
public void handler(Message msg,Channel channel){
  System.out.println("处理消息模块");
  //此时手动确认该消息--->队列就会把消息从队列中移除
  //long deliveryTag, 消息标识
  //boolean multiple,是否允许多次确认
  //channel.basicAck(long deliveryTag,boolean multiple);//信道手动确认
  long deliveryTag = msg.getMessageProperties().getDeliveryTag();//获得消息标识
  channel.basicAck(deliveryTag,true);//此处有异常,建议进行try catch处理
}

如果在手动状态下,不进行basicAck手动确认,那么在队列中会存在一条Unacked状态的消息。当然。如果程序出了异常,我们可以把这条消息退换给队列。方法如下

//boolean requeue, 是否把消息退换给队列
channel.basicNack(long deliveryTag,boolean multiple,boolean requeue)

你可以测试一下,在消费者中造个除零异常,可以体验一下拿到消息退回队列的死循环刷屏,当然,正常情况下不会出现这种情况,一般的退回原因都是因为网络或者其他问题。导致退回并重新处理该消息。

5.消费端限流问题

假设A系统每秒最大处理1000条请求,但是由于请求量太大,为了避免A系统从队列中拉取超出自身处理能力都情况出现,通常会采用消费端限流降低性能来换取稳定性,使得A系统分批次处理消息请求,从而设计到消费端限流问题

限流前提:必须消费者是手动确认模式。

开启限流模式:

spring:
  rabbitmq:
    listener:
      simple:
        #消费者手动确认消息
        acknowledge-mode: manual
        #设置消费者每次从队列中拉去消息的个数
        prefetch: x

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5717880.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-18
下一篇 2022-12-18

发表评论

登录后才能评论

评论列表(0条)

保存