工作的流程:
- 当生产者生产消息后,将消息发往队列.
- 当队列中有消息时,消费者会实时的监听队列中的消息.如果有消息则会执行消息
默认的传统队列是为均摊消费,存在不公平性;如果每个消费者速度不一样的情况下,均摊消费是不公平的,应该是能者多劳。
采用工作队列
在通道中只需要设置basicQos为1即可,表示MQ服务器每次只会给消费者推送1条消息必须手动ack确认之后才会继续发送。channel.basicQos(1);
public class Consumer1 { private static final String QUEUE_NAME = "yao-queue"; public static void main(String[] args) throws IOException, TimeoutException, IOException, TimeoutException { // 1.创建连接 Connection connection = RabbitMQConnection.getConnection(); // 2.设置通道 Channel channel = connection.createChannel(); //指定我们消费者每次批量获取消息 channel.basicQos(2); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "UTF-8"); System.out.println("消费者获取消息:" + msg); try { // 消费者完成 删除该消息 channel.basicAck(envelope.getDeliveryTag(), false); }catch (Exception e){ } } }; // 3.监听队列 channel.basicConsume(QUEUE_NAME, false, defaultConsumer); } }3,Fanout exchange(扇型交换机)发布订阅模式
生产者发送一条消息,经过交换机转发到多个不同的队列,多个不同的队列就多个不同的消费者。
原理:
- 需要创建两个队列 ,每个队列对应一个消费者;
- 队列需要绑定我们交换机
- 生产者投递消息到交换机中,交换机在将消息分配给两个队列中都存放起来;
- 消费者从队列中获取这个消息。
private static final String EXCHANGE_NAME = "fanout_exchange"; public static void main(String[] args) throws IOException, TimeoutException { // 创建Connection Connection connection = RabbitMQConnection.getConnection(); // 创建Channel Channel channel = connection.createChannel(); // 通道关联交换机 channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true); String msg = "6666"; channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes()); channel.close(); connection.close(); }
private static final String QUEUE_NAME = "heng-queue"; private static final String EXCHANGE_NAME = "fanout_exchange"; public static void main(String[] args) throws IOException, TimeoutException { System.out.println("邮件消费者..."); // 创建我们的连接 Connection connection = RabbitMQConnection.getConnection(); // 创建我们通道 final Channel channel = connection.createChannel(); // 关联队列消费者关联队列 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "UTF-8"); System.out.println("邮件消费者获取消息:" + msg); } }; // 开始监听消息 自动签收 channel.basicConsume(QUEUE_NAME, true, defaultConsumer); } } private static final String QUEUE_NAME = "yao-queue"; private static final String EXCHANGE_NAME = "fanout_exchange"; public static void main(String[] args) throws IOException, TimeoutException { System.out.println("短信消费者..."); // 创建我们的连接 Connection connection = RabbitMQConnection.getConnection(); // 创建我们通道 final Channel channel = connection.createChannel(); // 关联队列消费者关联队列 channel.queueBind (QUEUE_NAME, EXCHANGE_NAME, ""); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "UTF-8"); System.out.println("短信消费者获取消息:" + msg); } }; // 开始监听消息 自动签收 channel.basicConsume(QUEUE_NAME, true, defaultConsumer); } }4,Direct exchange(直连交换机)路由模式
当交换机类型为direct类型时,根据队列绑定的路由建转发到具体的队列中存放消息
// 生产者 通道关联交换机 channel.exchangeDeclare(EXCHANGE_NAME, "direct", true); String msg = "6666"; channel.basicPublish(EXCHANGE_NAME, "email", null, msg.getBytes()); //消费者1 // 关联队列消费者关联队列 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "email"); //消费者2 // 关联队列消费者关联队列 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "sms");5,Topic exchange(主题交换机)主题模式
当交换机类型为topic类型时,根据队列绑定的路由建模糊转发到具体的队列中存放。
#号表示支持匹配多个词;
*号表示只能匹配一个词
// 生产者 通道关联交换机 channel.exchangeDeclare(EXCHANGE_NAME, "topic", true); String msg = "6666"; channel.basicPublish(EXCHANGE_NAME, "yao.sms", null, msg.getBytes()); //消费者1 // 关联队列消费者关联队列 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "yao.*"); //消费者2 // 关联队列消费者关联队列 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "hang.*");
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)