RabbitMq高级之如何保证消息能够成功消费

RabbitMq高级之如何保证消息能够成功消费,第1张

RabbitMq高级之如何保证消息能够成功消费 1.消息消费两种方式
  • 推的模式:MQ 主动将消息推送给消费者,消费者设置一个缓存区去消费消息,这种效率高,也是主要采用的方式
  • 拉的模式:消费者主动到MQ中主动拉取消息,很少使用.

案列:
推模式主要是通过@RabbitListener标记消费者

@Component
public class DirectReceiver {
  @RabbitListener(queues = "test_queue_two")
    public void processTwo(String testMessage) {
        System.out.println("队列test_queue_two消费者收到消息:"+testMessage);
    }
}

拉模式

@Test
public void testPull() throws UnsupportedEncodingException {
    Object o = rabbitTemplate.receiveAndConvert("test_queue_two");
    System.out.println("o = " + new String(((byte[]) o),"UTF-8"));
}
2.确保消息成功消费

RabbitMQ针对消息能够确保消费成功主要是提供了手动ACK机制和自动ACK机制

3.消息消费,消费者消息拒绝

客户端接受到消息时,可以选择消费,也可以选择消息拒绝。

    // 消息拒绝策略
    @RabbitListener(queues = "test_queue")
    public void handle(Channel channel, Message message) {
        //获取消息编号
        long tag = message.getMessageProperties().getDeliveryTag();  //getDeliveryTag 此消息处理通道的名字 一个标识
        try {
            //拒绝消息 第二个参数为此消息是否重新入队 如果为false表示此条消息就会被丢掉,不会有新的消费者消费它。
            channel.basicReject(deliveryTag, true);  //basicReject 方法一次只能拒绝一条消息
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
4.ack和nack机制 1.自动确认机制
    //自动确认消息 注意消息消费方法自带事务,如果方法抛出异常,那么消息回重新回到队列
    @RabbitListener(queues = "test_queue")//监听的队列名称 test_queue
    public void process(String testMessage) {
        System.out.println("DirectReceiver消费者收到消息  : " + testMessage.toString());
    }
2.手动确认机制

配置:

spring:
  application:
    name: mq-test
  rabbitmq:
    host: 101.201.236.14
    port: 5672
    publisher-returns: true
#   设置确认机制改为手动机制
    listener:
      simple:
        acknowledge-mode: manual
    // 手动设置消费者 是否消费此消息 ack机制和nack机制
    @RabbitListener(queues = "test_queue_two")
    public void handleTwo(Message message,Channel channel) {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            String s = new String(message.getBody());
            System.out.println("s = " + s);
            //消费完成后,手动 ack  注意第二个参数为false时,仅表示当前消息消费成功,如果为 true,则表示当前消息之前所有未被当前消费者确认的消息都消费成功
            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {
            //手动 nack
            try {
                //消费失败,手动nack,第二个参数为false仅表示当前消息消费失败,如果为 true,则表示拒绝当前消息之前所有未被当前消费者确认的消息,第三个参数被拒绝的消息是否重新入队
                channel.basicNack(deliveryTag, false, true);
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }
    }

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5636487.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存