RabbitMQ的消息应答机制和轮询策略

RabbitMQ的消息应答机制和轮询策略,第1张

RabbitMQ的消息应答机制和轮询策略

目录

消息应答

消息自动应答消息手动应答介绍

消息应答方法

消息应答里的Multiple(批量应答)参数的解释 消息自动重新入队 消费手动应答代码 不公平分发预取值

消息应答

消息应答就是:消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了。这个默认是公平分发。

消息自动应答

如果配置的是自动应答,那么消息发送后立即被认为已经传送成功,如果程序这时候突然宕机,那么消息直接丢失,这是很不安全的。所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用

消息手动应答介绍 消息应答方法
A.Channel.basicAck(用于肯定确认) RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了
B.Channel.basicNack(用于否定确认)
C.Channel.basicReject(用于否定确认) 与 Channel.basicNack 相比少一个参数 不处理该消息了直接拒绝,可以将其丢弃了
消息应答里的Multiple(批量应答)参数的解释


消息自动重新入队

当消费者二号还没接收到信息A就宕机时,信息A会去消费者一号重新排队,然后被消费者一号接受

消费手动应答代码

睡眠工具类

public class SleepUtils {
    public static void sleep(int second) {
        try {
            Thread.sleep(1000 * second);
        } catch (InterruptedException _ignored) {
            Thread.currentThread().interrupt();
        }
    }
}

生产者代码

public class Task02 {
    private static final String TASK_QUEUE_NAME = "ack_queue";

    public static void main(String[] argv) throws Exception {

            Channel channel = RabbitMqUtils.getChannel();

            //生成队列
            channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
            //从控制台接收信息
            Scanner sc = new Scanner(System.in);
            System.out.println("请输入信息");
            while (sc.hasNext()) {
                
                String message = sc.nextLine();
                
                channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes("UTF-8"));
                
                System.out.println("生产者发出消息" + message);
            }
        
    }
}

消费者一号(快接收)

public class Work03 {
    private static final String ACK_QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws Exception {
        //获取信道
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("C1 等待接收消息处理时间较短");

        //消息消费的时候如何处理消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody());
            //睡一秒
            SleepUtils.sleep(1);
            System.out.println("接收到消息:" + message);
            
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        };
        //采用手动应答
        boolean autoAck = false;
        channel.basicConsume(ACK_QUEUE_NAME, autoAck, deliverCallback, (consumerTag) -> {
            System.out.println(consumerTag + "消费者取消消费接口回调逻辑");
        });
    }
}

消费者二号(慢接收)

public class Work04 {
    private static final String ACK_QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws Exception {
        //接受信道
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("C2 等待接收消息处理时间较长");
        //消息消费的时候如何处理消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody());
            //睡30秒
            SleepUtils.sleep(30);
            System.out.println("接收到消息:" + message);
            
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        };
        //采用手动应答
        boolean autoAck = false;
        channel.basicConsume(ACK_QUEUE_NAME, autoAck, deliverCallback, (consumerTag) -> {
            System.out.println(consumerTag + "消费者取消消费接口回调逻辑");
        });
    }
}

这时候我输入aa,bb,cc,dd,work03会马上收到aa,cc,30秒后work04收到bb,这时候我停掉work04,会发现work03接收到dd,说明dd重新入队并且被work03接收到了

不公平分发

在消费者消费队列之前 加入下面参数即可

//设置不公平分发
int prefetchCount = 1;
channel.basicQos(prefetchCount);

这时候在两个消费者都设置了这个之后,处理快的消费者会消费更多的东西

预取值
//设置预取值,预取值大于1,是几缓冲区就能放几个数据
int prefetchCount = 2;
channel.basicQos(prefetchCount);

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5706054.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存