- 简单说就是一个生产者者,多个消费者共同消费消息,比如有10个消息,就会平均分配给每一个消费者,一人5个消息。
生产者
public class Provider { public static void main(String[] args) throws IOException, TimeoutException { // 获取连接对象 Connection connection = MQConnection.getConnection(); // 获取通道对象 Channel channel = connection.createChannel(); // 通过通道声明队列 channel.queueDeclare("work", true, false, false, null); for (int i = 0; i < 10; i++) { // 生产消息 channel.basicPublish("", "work", null, (i + " hello work queque").getBytes()); } // 关闭资源 MQConnection.closeChannelAndConnection(channel, connection); } }
消费者1
public class Consumer1 { public static void main(String[] args) throws IOException { // 获取连接对象 Connection connection = MQConnection.getConnection(); // 获取通道对象 Channel channel = connection.createChannel(); // 通过通道声明队列 channel.queueDeclare("work", true, false, false, null); channel.basicConsume("work", true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1:"+new String(body)); } }); } }
消费者2
public class Consumer2 { public static void main(String[] args) throws IOException { // 获取连接对象 Connection connection = MQConnection.getConnection(); // 获取通道对象 Channel channel = connection.createChannel(); // 通过通道声明队列 channel.queueDeclare("work", true, false, false, null); channel.basicConsume("work", true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者2:"+new String(body)); } }); } }
- 以上方式中的消费者弊端显而易见,就是消费者1消费的很快,而消费者2消费的很慢,消费者1不就相当于浪费资源,在那里看着消费者2干活,多少有点不合理吧。所以如果想做到能者多劳的效果(channel.basicQos(1);),并且保证消息不丢失的情况下,希望每次手动进行确认消息(channel.basicAck(envelope.getDeliveryTag(), false);)是否消费完毕,那么有人就会问了,如果在手动ack的时候,mq挂机了,那消息不还是丢失了吗?那么我们可以给它加上事务 *** 作,有问题直接回滚,这样就能保证消息不丢失问题。
public class Consumer2 { public static void main(String[] args) throws IOException { // 获取连接对象 Connection connection = MQConnection.getConnection(); // 获取通道对象 Channel channel = connection.createChannel(); // 每次都从队列里拿一个消息进行消费,消费完成再从队列里获取另一个消息进行消费,这行代码就是实现能者多劳的效果。如果不写的话队列就会一股脑的把消息平均分配给所有消费者,那么就不能实现能者多劳的效果 channel.basicQos(1); // 通过通道声明队列 channel.queueDeclare("work", true, false, false, null); // 关闭自动确认,需要手动确认 channel.basicConsume("work", false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("消费者2:" + new String(body)); // 手动确认,防止消息还没有消费完成,mq把消息自动删除 // 参数:确认队列中哪个具体消息、是否开启多个消息同时确 channel.basicAck(envelope.getDeliveryTag(), false); } }); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)