生产者发送消息的过程
(1)生产者连接到RabbitMQ Broker,建立一个连接(Connection),开启一个信道(Channel)。
ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST); factory.setPort(PORT); factory.setUsername(USERNAME); factory.setPassword(PASSWORD); // 创建TCP连接 Connection connection = factory.newConnection(); // 创建信道 Channel channel = connection.createChannel();
(2)生产者声明一个交换器,并设置相关属性,比如交换器类型、是否持久化等。
// 创建一个type="topic"、持久化的、非自动删除的交换器 channel.exchangeDeclare(EXCHANGE_NAME, ROUTING_TYPE, true, false, null);
(3)生产者声明一个队列并设置相关属性,比如是否排他、是否持久化,是否自动删除等。
// 创建一个持久化、非排他的、非自动删除的队列 channel.queueDeclare(QUEUE_NAME, true, false, false, null);
(4)生产者通过路由键将交换器和队列绑定起来。
// 将交换器与队列通过路由键进行绑定 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
(5)生产者发送消息至RabbitMQ Broker,其中包含路由键、交换器等信息。
// 发送一条消息 channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, "消息内容".getBytes(StandardCharsets.UTF_8));
(6)相应的交换器根据接收的路由键查找相匹配的队列
(7)如果找到,则从生产者发送过来的消息存入相应的队列中。
(8)如果没有找到,则根据生产者配置的属性选择丢弃还是退回给生产者。
(9)关闭信道。
channel.close();
(10)关闭连接。
connection.close();
消费者接收消息的过程
(1)消费者连接到RabbitMQ Broker,建立一个连接(Connection)、开启一个信道(Channel)。
(2)消费者向RabbitMQ Broker请求消费相应队列中的消息,可能会设置相应的回调函数,以及做一些准备工作。
(3)等到RabbitMQ Broker回应并投递相应队列中的消息,消费者接收消息。
(4)消息者确认(ack)接收到的消息。
(5)RabbitMQ从队列中删除相应已经被确认的消息。
(6)关闭信道。
(7)关闭连接。
Connection:连接 Channel:信道
无论是生产者还是消费者,都需要和RabbitMQ Broker建立连接,这个连接就是一条TCP连接,也就是Connection。
一旦TCP连接建立起来,客户端紧接着可以创建一个AMQP信道(Channel),每一个信道都会被指派一个唯一的ID。信道是建立在Connection之上的虚拟连接,RabbitMQ处理的每条AMQP指令都是通过信道完成的。
完全可以直接使用Connection就能完成信道的工作,为什么还要引入信道?
如果一个应用程序中有很多线程需要从RabbitMQ中消费消息或者生产消息。那么必然需要建立很多个Connection,也就是许多个TCP连接。然而对于 *** 作系统而言,建立和销毁TCP连接是非常昂贵的开销,如果遇到使用高峰,性能瓶颈也就随之显现。
RabbitMQ采用类似NIO的做法,选择TCP连接复用,不仅可以减少性能开销,同时也便于管理。
每个线程把持一个信道,所以信道复用了Connection的TCP连接,同时RabbitMQ可以确保每个线程的私密性,就像拥有独立的连接一样。
当每个信道的流量不是很大时,可以复用单一的Connection,有效节省TCP连接资源。
当信道本身的流量很大时,复用同一个Connection就会产生性能瓶颈,此时需要开辟多个Connection,将这些信道均摊到这些Connection中。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)