生产者多消费者消费模式
消费通道每次获取一个消息:channel.basicQos(1);
channel.basicConsume(“msg”,false,new DefaultConsumer(channel){
autoAck 关闭自动确认消息
获取消息后手动确认
package wordqueues; import com.jia.utils.RabbitMQUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.MessageProperties; import org.junit.Test; import java.io.IOException; public class Provider { @Test public void sendMsg() throws IOException { Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("msg",false,false,false,null); for (int i = 0; i < 200; i++) { channel.basicPublish("","msg", MessageProperties.PERSISTENT_TEXT_PLAIN,("消息"+i).getBytes()); } RabbitMQUtils.closeConnectAndChanel(channel,connection); } }消费者
package com.jia.wordqueues; import com.jia.utils.RabbitMQUtils; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer1 { public static void main(String[] args) throws IOException { Connection connection = RabbitMQUtils.getConnection(); final Channel channel = connection.createChannel(); //通道只取一个消息 channel.basicQos(1); channel.queueDeclare("msg",false,false,false,null); channel.basicConsume("msg",false,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(new String(body)); //手动确认消息标识 multiple 是否确认多个 false 每次确认一个 channel.basicAck(envelope.getDeliveryTag(),false); } }); } }
package com.jia.wordqueues; import com.jia.utils.RabbitMQUtils; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer2 { public static void main(String[] args) throws IOException { //获取连接 Connection connection = RabbitMQUtils.getConnection(); final Channel channel = connection.createChannel(); //通道只取一个消息 channel.basicQos(1); //队列声明 channel.queueDeclare("msg",false,false,false,null); channel.basicConsume("msg",false,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { Thread.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(new String(body)); //手动确认消息标识 multiple 是否确认多个 false 每次确认一个 channel.basicAck(envelope.getDeliveryTag(),false); } }); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)