Work queues,也被称为(Task queues),任务模型。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用work 模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。
角色:
P:生产者:任务的发布者
C1:消费者-1,领取任务并且完成任务,假设完成速度较慢
C2:消费者-2:领取任务并完成任务,假设完成速度快
1. 开发生产者package com.demo.workquene; import com.demo.utils.RabbitMQUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Provider { public static void main(String[] args) throws IOException, TimeoutException { Connection rabbitConnection = RabbitMQUtils.getRabbitConnection(); Channel channel = rabbitConnection.createChannel(); channel.queueDeclare("work",true,false,false,null); for (int i = 0; i < 10; i++) { channel.basicPublish("","work",null,(i+"hello work quene").getBytes()); } RabbitMQUtils.closeRabbitConnection(channel,rabbitConnection); } }2.开发消费者-1
package com.demo.workquene; import com.demo.utils.RabbitMQUtils; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Customer1 { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitMQUtils.getRabbitConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("work",false,false,false,null); channel.basicConsume("work",false,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者-1 "+new String(body)); } }); } }3.开发消费者-2
跟消费者1的代码几乎一样
4.测试结果
总结:默认情况下,RabbitMQ将按顺序将每个消息发送给下一个使用者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为循环。
5.消息自动确认机制package com.demo.workquene; import com.demo.utils.RabbitMQUtils; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Customer1 { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitMQUtils.getRabbitConnection(); Channel channel = connection.createChannel(); channel.basicQos(1);//设置每次接受一个消息 channel.queueDeclare("work",true,false,false,null); channel.basicConsume("work",false,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者-1 "+new String(body)); channel.basicAck(envelope.getDeliveryTag(),false);//手动确认消息 } }); } }
设置通道一次只能消费一个消息
关闭消息的自动确认,开启手动确认消息
通过线程睡眠来体现能者多劳的消费者,只需要在消费是睡眠1s即可
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)