消息中间件rabbitMQ之第二种消息模型(work quene)

消息中间件rabbitMQ之第二种消息模型(work quene),第1张

消息中间件rabbitMQ之第二种消息模型(work quene)

Work queues,也被称为(Task queues),任务模型。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用work 模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。

角色:

P:生产者:任务的发布者

C1:消费者-1,领取任务并且完成任务,假设完成速度较慢

C2:消费者-2:领取任务并完成任务,假设完成速度快

1. 开发生产者
package com.demo.workquene;

import com.demo.utils.RabbitMQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;


public class Provider {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection rabbitConnection = RabbitMQUtils.getRabbitConnection();
        Channel channel = rabbitConnection.createChannel();
        channel.queueDeclare("work",true,false,false,null);
        for (int i = 0; i < 10; i++) {
            channel.basicPublish("","work",null,(i+"hello work quene").getBytes());

        }
        RabbitMQUtils.closeRabbitConnection(channel,rabbitConnection);
    }
}
2.开发消费者-1
package com.demo.workquene;

import com.demo.utils.RabbitMQUtils;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;


public class Customer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitMQUtils.getRabbitConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("work",false,false,false,null);
        channel.basicConsume("work",false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者-1 "+new String(body));
            
            }
        });
    }
}
3.开发消费者-2

跟消费者1的代码几乎一样

4.测试结果

 

总结:默认情况下,RabbitMQ将按顺序将每个消息发送给下一个使用者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为循环。  

5.消息自动确认机制
package com.demo.workquene;

import com.demo.utils.RabbitMQUtils;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;


public class Customer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitMQUtils.getRabbitConnection();
        Channel channel = connection.createChannel();
        channel.basicQos(1);//设置每次接受一个消息
        channel.queueDeclare("work",true,false,false,null);
        channel.basicConsume("work",false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者-1 "+new String(body));
                channel.basicAck(envelope.getDeliveryTag(),false);//手动确认消息
            }
        });
    }
}

设置通道一次只能消费一个消息

关闭消息的自动确认,开启手动确认消息

通过线程睡眠来体现能者多劳的消费者,只需要在消费是睡眠1s即可

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5705719.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存