- RabbitMQ 的常见模式
- 简单 (simple) 模式
- 工作 (work) 模式
- 发布 / 订阅 (pub/sub) 模式
- 路由 (routing) 模式
- 主题 (Topic) 模式
- 在下图中,“P”是生产者,“C”是消费者。中间的框是一个队列(保存消息的地方)。
- 公用代码:连接到RabbitMQ服务器
public class RabbitConnection { public static Connection getConnection() throws Exception { ConnectionFactory factory = new ConnectionFactory(); // address factory.setHost("127.0.0.1"); factory.setUsername("guest"); factory.setPassword("guest"); factory.setVirtualHost("/"); // Virtual Host默认节点 return factory.newConnection(); } }简单 (simple) 模式
simple 模式,是 RabbitMQ 几种模式中最简单的一种模式,其结构如下图所示:
- simple模式有以下几个特征:
- 只有一个生产者、一个消费者和一个队列。
- 生产者和消费者在发送和接收消息时,只需要指定队列名,而不需要指定发送到哪个 Exchange,RabbitMQ 服务器会自动使用 Virtual host 的默认的 Exchange,默认 Exchange 的 type 为 direct。
生产者
public class Send { private static final String QUEUE_NAME = "simple_mq"; //消息队列名称 public static void main(String[] args) throws Exception { //获取连接 Connection connection = RabbitConnection.getConnection(); //创建频道 Channel channel = connection.createChannel(); //队列声明 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 发送的消息 String message = "hello,I'm is simple_mq.Who are you?"; // 信道发送消息 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent.......... '" + message + "'"); } }
消费者
public class ReceiV { private static final String QUEUE_NAME = "simple_mq"; public static void main(String[] args) throws Exception { Connection connection = RabbitConnection.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); // DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; //监听队列 channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {}); } }工作 (work) 模式
在 simple 模式下只有一个生产者和消费者,当生产者生产消息的速度大于消费者的消费速度时,我们可以添加一个或多个消费者来加快消费速度,这种在 simple 模式下增加消费者的模式,称为 work 模式,如下图所示:
- work 模式有以下两个特征:
- 可以有多个消费者,但一条消息只能被一个消费者获取。
- 发送到队列中的消息,由服务器平均分配给不同消费者进行消费。
生产者
public class Send { private static final String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws Exception { Connection connection = RabbitConnection.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 消费者发送确认消息(ACK)之前,只发送一个消息给你 channel.basicQos(1); for (int i = 0; i < 50; i++) { String message = "work+" + i; System.out.println(message); channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); Thread.sleep(i * 20); } } }
channel.basicQos() void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException; 参数: prefetchSize:消息的大小 prefetchCount:会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该consumer将block掉,直到有消息ack global:是否将上面设置应用于channel,简单点说,就是上面限制是channel级别的还是consumer级别
消费者
public class ReceiveOne { private static final String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws Exception { Connection connection = RabbitConnection.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1); //需要修改 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [ConsumerOne is] Received '" + message + "'"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); //这里是手动应答 } }; boolean autoAck = false; //这里需要修改自动应答为false channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {}); } }
public class ReceiveTwo { private static final String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws Exception { //建立连接 Connection connection = RabbitConnection.getConnection(); //创建频道 Channel channel = connection.createChannel(); //队列声明 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 不公平分发 channel.basicQos(1); //需要修改 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [ConsumerTwo is] Received '" + message + "'"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } finally { channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); //这里是手动应答 } }; boolean autoAck = false; //这里需要修改自动应答为false //监听队列 channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {}); } }
channel.basicAck() void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException; 参数: deliveryTag:该消息的index multiple:是否批量处理.true:将一次性ack所有小于deliveryTag的消息 requeue:被拒绝的是否重新入队列发布 / 订阅 (pub/sub) 模式
work 模式可以将消息转到多个消费者,但每条消息只能由一个消费者获取,如果我们想一条消息可以同时给多个消费者消费呢?这时候就需要发布 / 订阅模式,其示意图如下所示:
- 从上面的示意图我们可以看出来,在发布 / 订阅模式下,需要指定发送到哪个 Exchange 中。
- 发布 / 订阅模式中,Exchange 的 type 为 fanout。
- 生产者发送消息时,不需要指定具体的队列名,Exchange 会将收到的消息转发到所绑定的队列。
- 消息被 Exchange 转到多个队列,一条消息可以被多个消费者获取。
- 在上图中,oneQueue 中的消息要么被 CustomerA 获取,要么被 CustomerB 获取。也就是同一条消息,要么是 CustomerA + CustomerC 消费、要么是 CustomerB + CustomerC 消费。
生产者
public class Send { private static final String EXCHANGE_NAME = "test_exchange"; public static void main(String[] args) throws Exception { Connection connection = RabbitConnection.getConnection(); // 创建信道 Channel channel = connection.createChannel(); //声明交换机 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); //send a msg String msg = "hello exchange"; channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes()); System.out.println("send:" + msg); channel.close(); connection.close(); } }
消费者
public class ReceiveOne { private static final String QUEUE_NAME = "oneQueue"; private static final String EXCHANGE_NAME = "test_exchange"; public static void main(String[] args) throws Exception { Connection connection = RabbitConnection.getConnection(); Channel channel = connection.createChannel(); //队列声明 channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1); //绑定队列到交换机转发器 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); //这里需要绑定exchange,其他的和前面的work_queue是一样的 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [ConsumerOne is] Received '" + message + "'"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); //这里是手动应答 } }; boolean autoAck = false; channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { }); } }
public class ReceiveTwo { private static final String QUEUE_NAME = "twoQueue"; private static final String EXCHANGE_NAME = "test_exchange"; public static void main(String[] args) throws Exception { Connection connection = RabbitConnection.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [ConsumerTwo is] Received '" + message + "'"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } finally { channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); //这里是手动应答 } }; boolean autoAck = false; channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { }); } }路由 (routing) 模式
前面几种模式,消息的目标队列无法由生产者指定,而在路由模式下,消息的目标队列,可以由生产者指定,其示意图如下所示:
- 从上面的示意图我们可以看出来:
- 路由模式下Exchange的 type 为direct。
- 消息的目标队列可以由生产者按照routingKey规则指定。
- 消费者通过BindingKey绑定自己所关心的队列。
- 一条消息队可以被多个消息者获取。
- 只有RoutingKey与BidingKey相匹配的队列才会收到消息。
RoutingKey用于生产者指定Exchange最终将消息路由到哪个队列,BindingKey用于消费者绑定到某个队列。
生产者
public class Send { private static final String EXCHANGE_NAME = "test_exchange_direct_routing"; public static void main(String[] args) throws Exception { Connection connection = RabbitConnection.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); //send a msg String msg = "hello direct"; String routingKey = "info"; //这里是定义的routingKey channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes()); System.out.println("send:" + msg); channel.close(); connection.close(); } }
消费者
// 只能接收error public class ReceiveOne { private static final String QUEUE_NAME = "receive1_queue_routing"; private static final String EXCHANGE_NAME = "test_exchange_direct_routing"; public static void main(String[] args) throws Exception { Connection connection = RabbitConnection.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1); String routingKey = "error"; //表示只能接收error channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKey); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [ConsumerOne is] Received '" + message + "'"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); //这里是手动应答 } }; boolean autoAck = false; channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { }); } }
// 能接收error,info 和 warning public class ReceiveTwo { private static final String QUEUE_NAME = "receive2_queue_routing"; private static final String EXCHANGE_NAME = "test_exchange_direct_routing"; public static void main(String[] args) throws Exception { Connection connection = RabbitConnection.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1); // 设置了三种key channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error"); //注意这里,我们设置了三个路由key channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [ConsumerTwo is] Received '" + message + "'"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } finally { channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); //这里是手动应答 } }; boolean autoAck = false; channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { }); } }主题 (Topic) 模式
主题模式是在路由模式的基础上,将路由键和某模式进行匹配。其中#表示匹配多个词,*表示匹配一个词,消费者可以通过某种模式的 BindKey 来达到订阅某个主题消息的目的,如示意图如下所示:
- 从上面的示意图我们可以看出来:
- 主题模式 Exchange 的 type 取值为 topic。
- 一条消息可以被多个消费者获取。
生产者
public class Send { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { //获取连接 Connection connection = RabbitConnection.getConnection(); //创建频道 try (Channel channel = connection.createChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, "topic"); MapbindingKeyMap = new HashMap<>(); bindingKeyMap.put("quick.orange.rabbit","被队列 Q1Q2 接收到"); bindingKeyMap.put("lazy.orange.elephant","被队列 Q1Q2 接收到"); bindingKeyMap.put("quick.orange.fox","被队列 Q1 接收到"); bindingKeyMap.put("lazy.brown.fox","被队列 Q2 接收到"); bindingKeyMap.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列 Q2 接收一次"); bindingKeyMap.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到会被丢弃"); bindingKeyMap.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃"); bindingKeyMap.put("lazy.orange.male.rabbit","是四个单词但匹配 Q2"); for (Map.Entry bindingKeyEntry: bindingKeyMap.entrySet()){ String bindingKey = bindingKeyEntry.getKey(); String message = bindingKeyEntry.getValue(); channel.basicPublish(EXCHANGE_NAME,bindingKey, null, message.getBytes("UTF-8")); System.out.println("生产者发出消息" + message); } } } }
消费者
public class ReceiveOne { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { Connection connection = RabbitConnection.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); //声明 Q1 队列与绑定关系 String queueName="Q1"; channel.queueDeclare(queueName, false, false, false, null); channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*"); System.out.println("等待接收消息........... "); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" 接 收 队 列 :"+queueName+" 绑 定 键:"+ delivery.getEnvelope().getRoutingKey()+",消息:"+message); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {}); } }
public class ReceiveTwo { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { Connection connection = RabbitConnection.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); //声明 Q2 队列与绑定关系 String queueName="Q2"; channel.queueDeclare(queueName, false, false, false, null); channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit"); channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#"); System.out.println("等待接收消息........... "); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" 接 收 队 列 :"+queueName+" 绑 定 键:"+ delivery.getEnvelope().getRoutingKey()+",消息:"+message); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {}); } }
你知道的越多,你不知道的越多。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)