MQ的使用及QMQ的设计

MQ的使用及QMQ的设计,第1张

RabbitMQ, Kafka, rocketMQ等数据可以百度查到

QMQ的开发者又谈到QMQ的性能和rocket的性能处于同一个数量级

Kafka和RocketMQ都是基于partition的存储模型, 也就是每一个subject分成一个或者多个partition, 同时consumer消费的时候也是和partition一一对应的

如下 :

在这种设计的模式下, 如果consumer数目大于partition的数据, 就会出现consumer处于空闲的状态

如果partition数据大于consumer的数据就会出现部分consumer繁忙的状况

以上是用基于partition去做负载均衡所带来的问题 由于这种静态绑定的关系, 如果遇到了消费速度更不上消费的速度, 单单的增加consumer是不够的 需要增加partition 尤其是在kafka里, partition是一个比较重的资源, 增加太多的partition还需要考虑集群的处理能力; 同时当高峰期过了之后, 如果想缩容consumer也是比较麻烦的, 因为partition只能增加, 不能减少

上述设计, 同时带了另一个问题, 就是如果有消息积压, 我们增加partition也是没有用的, 因为消费已经挤压到已存在的partition中, 新增partition只能够消费新分配过来的数据

以上是QMQ的存储模型, 方框上方的数字代表该方框自己在log中的偏移量, 方框中的数据代表该项的内容 如何message log上方的3,6,9表示这几条消息在message log中的偏移量 而consume log中方框内的数据3,6,9,20对应着message log的偏移, 表示这几条消息都在topic1中, consume log 方框上方的1,2,3,4代表这几个方框在consume log中的逻辑偏移 下面的pull log 方框中的1,2,3,4对应着consume log的逻辑偏移, 而pull log方框外的数字表示pull log的逻辑偏移

message log 是所有消息的主存储体, 所有topic的消息都进入该log

consume log 存储的是message log的索引

pull log 每个consumer拉取消息的时候会产生pull log, pull log 记录的是拉取消息在consume log中的sequence

这个时候消费者就可以使用pull log上的sequence来表示消费的进度, 这样一来我们就解耦了consumer和partition之间的耦合关系 两者可以任意扩展

分片 + 复制

QMQ不是基于partition的, 可以通过增加更多的机器提高一个topic的可用性 消息按照一定的负载均衡策略, 分配到不同的机器上, 某台机器离线之后, producer将不再将消息发送到server

QMQ通过主从复制来提高单机高可用 QMQ将服务器划分为过个group, 每一个group都包含多个master和slave, 消息的发送和消费全部指向master, slave只保证可用性

一般的消息分为At Most Once, At Least Once, Exactly once 而最后一种属于我们最期望的一种模型, 同时这种模型的实现也不容易 由于网络和应用依赖的复杂性, Exactly once基本不可行, 但是我们可以通过幂等处理来实现最终的Exactly once

QMQ中用到的HashWheelTimer是采用Netty得HashWheelTimer实现的。

如上面的这个图, 假设时间轮大小为8(这个轮子底层用了一个数组实现的) 1s转动一格, 每一格指向一个链表, 这个链表保存着待执行的任务(TimeOutTask)

参考 :
>Mq通道和队列属于辅助的关系,他们两个需要共同建立才可以更好的作用。
扩展资料
一个队列管理器可以有多个队列和多个通道。
队列管理器相当于RabbitMQ中的虚拟主机。
队列分为本地队列,远程队列,传输队列。
通道分为发送通道、接收通道、服务器通道等等。
发送到本地队列上的消息存储在本机上。
发送到远程队列上的消息,通过绑定传输队列传输到别的队列管理器上的本地队列上存储。
通道为消息进出队列的渠道桥梁,发送通道只能出,接收通道只能进,服务器通道可以进出。
本地例子:
发送消息===>Java程序===>服务器通道===>本地队列===>服务器通道===>Python程序===>收到消息
两家公司各自服务器AB例子:
公司A发送消息===>Java程序===>服务器通道A===>远程队列A>>>>传输队列A>>>>发送通道A>>>>接收通道B>>>>本地队列B===>服务器通道B===>Python程序===>公司B收到消息===>响应===>响应消息发送===>Python程序===>服务器通道B===>远程队列B>>>>传输队列B>>>>发送通道B>>>>接收队列A>>>>本地队列A===>服务器通道A===>Java程序===>公司A收到响应消息

有些应用程序需要非常高的吞吐量,而其他一些应用程序却正在发布批处理作业,这些作业可能会延迟一段时间。在设计系统时,目标应该是最大限度地将性能和可用性结合起来,这对您的特定应用程序是有意义的。错误的体系结构设计决策或客户端错误,可能会损坏中间件或影响吞吐量。

您的发布服务器可能会停止运行,或者由于内存使用过多而导致服务器崩溃。本系列文章重点关注rabbitmq的最佳实践。应做和不应做两种不同使用类别的最佳实践相混合;高可用性和高性能(高吞吐量)。我们将讨论队列大小、常见错误、延迟队列、预取值、连接和通道、HIPE和集群中的节点数。这些通常都是最佳实践规则,基于我们在使用rabbitmq时获得的经验。

队列中的许多消息会对RAM的使用造成很大的负担。为了释放RAM,rabbitmq将(页面输出)消息刷新到磁盘。此过程会降低排队速度。当有许多消息需要分页取出时,分页过程通常会花费时间并阻止队列处理消息。许多消息可能会对中间件的性能产生负面影响。

当有许多消息重启集群时,也是费时的,因为必须重建索引。重新启动后,在群集中的节点之间同步消息也需要时间。

在rabbitmq 36中添加了一个名为lazy queues的功能。懒惰队列是消息自动存储到磁盘上的队列。只有在需要时才将消息加载到内存中。对于懒惰的队列,消息直接进入磁盘,因此RAM的使用被最小化,但是吞吐时间将花费更长的时间。

我们已经看到,懒惰的队列以更好的可预测性的方式,创建了一个更稳定的集群。要让您的消息不出现警告,请刷新到磁盘。你不会突然被一个性能冲击问题所困扰。如果您一次发送大量消息(例如处理批处理作业),或者如果您认为您的消费者一直无法跟上发布者的速度,我们建议您启用延迟队列。

对于经常受到消息峰值冲击的应用程序,以及要求吞吐量比其他任何东西都重要的应用程序,可以推荐的另一做法是设置队列的最大长度。这样可以通过丢弃来自队列头部的消息来保持队列的简短性,从而使队列永远不会超过max-length设置。

队列在rabbitmq中是单线程的,一个队列可以处理大约50k条消息/秒。如果您有多个队列和消费者,您可以在多核系统上获得更好的吞吐量。如果在底层节点上拥有与核心一样多的队列,那么您将获得最佳吞吐量。

rabbitmq管理接口为集群中的每个队列收集和计算度量。如果您有数千个活动队列和使用者,这可能会减慢服务器的运行速度。如果队列太多,CPU和RAM的使用也可能受到负面影响。

队列性能受限于一个CPU核心。因此,如果将队列拆分到不同的核心,您将获得更好的性能;如果您拥有rabbitmq集群,您也可以将他们拆分到不同的节点。

rabbitmq队列绑定到最初声明它们的节点。即使您创建了一个rabbitmq中间件集群,所有路由到特定队列的消息都将转到该队列所在的节点。您可以在节点之间平均地手动拆分队列,但缺点是您需要记住队列的位置。

如果您有多个节点或具有多个核心的单节点集群,我们建议使用两个插件来帮助您:

当您想要在生产者和消费者之间共享队列时,为队列命名是很重要的,但是如果您使用临时队列,则不重要。相反,您应该让服务器使用一个随机的队列名称,而不是你自己命名一个——或者修改rabbitmq策略。

客户机连接可能会失败,并可能留下未使用的资源(队列),留下许多队列可能会影响性能。自动删除队列有三种方法:

在 Erlang VM 的内部队列每个队列均使用用了一个优先级别,他们耗费了一些资源。在大多数情况下,不超过5个优先级就足够了。

一个常见的问题是如何处理发送到rabbitmq的消息的palyload(消息大小)。当然,您不应该在消息中发送非常大的文件信息,但是每秒的消息数是一个比它本身的消息大小更大的瓶颈。发送多个小消息可能是一个坏的选择。一个更好的办法是将它们捆绑成一个更大的消息,让消费者将其拆分。但是,如果捆绑多条消息,则需要记住这可能会影响处理时间。如果其中一条捆绑消息失败,是否需要重新处理所有这些消息?如何设置这个取决于带宽和体系结构。

每个连接使用大约100kb的RAM(如果使用TLS,甚至更多)。数千个连接可能是rabbitmq服务器的沉重负担。在最坏的情况下,服务器可能由于内存不足而崩溃。AMQP协议有一种称为“多路复用”的机制,它“复用”单个TCP连接。它建议每个进程只创建一个TCP连接,并在这个唯一一个连接的基础上为不同的线程使用多个通道。连接也应该是长连接的。AMQP连接的握手过程非常复杂,至少需要7个TCP数据包(如果使用了TLS,则需要更多)。

相反,如果需要,可以更频繁地打开和关闭通道。如果可能的话,甚至通道也应该是长寿命的,例如,在每个发布信息线程中复用相同的通道。每次发布信息时不用打开频道。最佳实践是复用连接,使用各通道在一个连接的基础上实现多路复用。理想情况下,每个进程只能有一个连接,然后在应用程序中为每个线程使用一个通道,而每个channel 复用同一个连接即可。

您还应该确保不在线程之间共享通道,因为大多数客户机不保证通道是线程安全的(因为这样会对性能产生严重的负面影响)。

确保不要在线程之间共享通道,因为大多数客户机不会使通道线程安全(因为这样会对性能产生严重的负面影响)。

为发布者和消费者区分连接以获得高吞吐量。当发布服务器向服务器发送太多要处理的消息时,rabbitmq可以对TCP连接施加反向压力。如果消费者使用相同的TCP连接,服务器可能不会从客户机接收消息确认。因此,消费性能也会受到影响。而随着消费速度的降低,服务器将不堪重负。

具有大量连接和通道的另一个影响为rabbitmq管理接口的性能。对于每个连接和通道性能,指标必须收集、分析和显示度量。

在连接失败的情况下,传输中的消息可能会丢失,并且可能需要重新传输此类消息。Acknowledgements 让服务器和客户机知道何时重新传输消息。客户机可以在收到消息时对其进行确认,也可以在客户机完全处理完消息后对其进行确认。Acknowledgement 具有性能影响,因此为了实现最快的吞吐量,应该禁用手动确认。

接收重要消息的消费应用程序在完成需要对其进行的任何 *** 作之前不应确认消息,这样未处理的消息(工作进程崩溃、异常等)就不会丢失。

发布确认,是相同的事情,但用于发布。服务器收到来自发布服务器的消息时会进行确认。发布确认也会影响性能。但是,应该记住,如果发布者至少需要处理一次消息,就需要这样做。

所有未确认的消息必须驻留在服务器上的RAM中。如果您有太多未确认的消息,您将耗尽内存。限制未确认消息的一个有效方法是客户端预取的消息数做出相关设置。在预取部分了解有关预取的更多信息。

如果您不能承受丢失任何消息的代价,请确保您的队列声明为“持久”,并且您的消息以传递模式“持久”发送。

为了避免在中间件中丢失消息,需要为中间件重新启动、中间件硬件故障或中间件崩溃时做好准备。为了确保消息和中间件定义在重新启动后仍然存在,我们需要确保它们在磁盘上。在中间件重新启动期间,不持久的消息、交换和队列将会被丢失。

持久性消息更重,因为它们必须写入磁盘。请记住,即使您发送的是临时消息,懒惰的队列也会对性能产生相同的影响。对于高性能-请使用瞬态消息。

您可以通过amqps连接到rabbitmq,这是用tls包装的amqp协议。由于所有流量都必须加密和解密,因此TLS会影响性能。为了获得最大的性能,我们建议使用vpc对等,那么流量是私有的,并且是独立的,不涉及AMQP客户机/服务器。

在cloudamqp中,我们将rabbitmq服务器配置为只接受快速但安全的加密密码并确定其优先级。

预取值用于指定多少条消息将同时被发送给消费者。它被用来从你的消费者那里得到尽可能多的东西(饱和工作)。

From RabbitMQcom: “The goal is to keep the consumers saturated with work, but to minimise the client's buffer size so that more messages stay in Rabbit's queue and are thus available for new consumers or to just be sent out to consumers as they become free”
来自rabbitmqcom:“我们的目标是让消费者饱和工作,但要最大限度地减小客户机的缓冲区大小,因此更多的消息被留在Rabbit的队列中,从而对新的消费者可用,或者发送给那些变得空闲的消者。”

rabbitmq的默认预取设置为客户端提供了一个不受限制的缓冲区,这意味着rabbitmq在默认情况下会将尽可能多的消息发送给任何看起来准备接受它们的客户机。发送的消息由rabbitmq客户端库(在使用者中)缓存,直到对其进行处理。预取限制了在确认消息之前客户端可以接收的消息数。所有预取的消息都将从队列中删除,并且对其他使用者不可见。
A too small prefetch count may hurt performance since RabbitMQ is most of the time waiting to get permission to send more messages The image below is illustrating long idling time In the example, we have a QoS prefetch setting of 1 This means that RabbitMQ won't send out the next message until after the round trip completes (deliver, process, acknowledge) Round time in this picture is in total 125ms with a processing time of only 5ms
预取数太小可能会影响性能,因为rabbitmq大多数时间都在等待获得发送更多消息的许可。下图显示的是长时间的空转时间。在本例中,QoS预取设置为1。这意味着rabbitmq在往返完成(传递、处理、确认)之前不会发送下一条消息。中的整个周期时间总共为125ms,处理时间仅为5ms。

另一方面,大量的预取数可以接收队列中的大量消息并将其传递给同一个消费者,但是其他使用者却处于空闲状态。

如果您有一个或几个消费者快速处理消息,我们建议您一次预取多个消息。尽量让你的客户端繁忙。如果您一直有大约相同的处理时间,并且网络行为保持不变-您只需在客户机上为每个消息计算总的往返时间/处理时间,即可获得估计的预取值。

如果您有许多消费者,并且处理时间很短,我们建议预取值设置的应该比单个或少数使用者要低一些。太低的值会让消费者空转很多,因为他们需要等待消息到达。过高的值可能会使一个消费者忙碌,而其他消费者则处于空闲状态。

如果您有许多使用者和/或处理时间较长,我们建议您将预取计数设置为1,以便消息在所有消费者中均匀分布。

请注意,如果客户端自动确认消息,则预取值将不起作用。

一个典型的错误是有一个无限的预取,其中一个客户机接收所有的消息,耗尽内存并崩溃,然后所有的消息都被重新传递。

有关rabbitmq预取的信息,请参阅推荐的rabbitmq文档。

HIPE将以增加启动时间为代价增加服务器吞吐量。启用HIPE时,将在启动时编译rabbitmq。根据我们的基准测试,吞吐量增加了20-80%。HIPE的缺点是启动时间也增加了很多,大约1-3分钟。在rabbitmq的文档中,hipe仍然被标记为实验性的。

如果您需要高可用性,请不要启用HIPE。

当您用一个节点创建一个cloudamqp实例时,您将得到一个具有高性能的单个节点。一个节点将为您提供 最高的性能 ,因为消息不需要在多个节点之间进行镜像。

当您使用两个节点创建一个CloudAMQP实例时,与单个节点的相比,您将获得一半的性能。节点位于不同的可用性区域,队列在可用性区域之间自动镜像。两个节点将为您提供 高可用性 ,因为一个节点可能崩溃或被标记为受损,但另一个节点仍将启动并运行,准备接收消息。

当您使用三个节点创建一个CloudAMQP实例时,与单个节点的相同计划大小相比,您将获得1/4的性能。节点位于不同的可用性区域,队列在可用性区域之间自动镜像。您也可以暂停少数组件-与允许每个节点响应相比,通过关闭少数组件,您减少了重复传递。暂停少数组件是三节点集群中的一种分区处理策略,它可以防止由于网络拆分而导致数据不一致。

我们在cloudamqp集群中注意到的一个常见错误是,用户创建了一个新的vhost,但忘记为新的vhost启用一个ha策略。如果没有HA策略,则不会在节点之间同步消息。

直接交换是最快速。如果有许多bindings ,rabbitmq必须计算将消息发送到何处。

有些插件可能非常好用,但它们可能会消耗大量的CPU或RAM。因此,不建议将它们用于生产服务器。确保禁用不使用的插件。您可以通过CloudAmqp中的控制面板启用许多不同的插件。

将rabbitmq管理统计速率模式设置为detailed会严重影响性能,不应在生产中使用。

确保您使用的是最新推荐的客户端库版本

保持最新稳定版本的rabbitmq和erlang。在为客户发布新的主要版本之前,我们通常会在很大程度上对其进行测试。请注意,在为新集群选择版本的下拉列表中,我们始终使用最推荐的版本作为所选选项(默认)。

Dead lettering和TTL是rabbitmq中的两个流行功能,应该谨慎使用。TTL和Dead lettering可以产生您没有预料到的性能影响。

使用x-dead-letter-exchange属性声明的队列将向指定的dead-letter-exchange 发送被拒绝、非确认或过期(带有ttl)的消息。如果您指定了x-dead-letter-routing-key,则消息的路由键将在dead lettered时更改。

通过使用x-message-ttl属性声明队列,如果消息在指定的时间内未被使用,则将从队列中丢弃消息。

public static void main(String[] args) throws JMSException {
String url = "tcp://localhost:61616";
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
url);
// 设置用户名和密码,这个用户名和密码在conf目录下的credentialsproperties文件中,也可以在activemqxml中配置
connectionFactorysetUserName("system");
connectionFactorysetPassword("manager");
// 创建连接
Connection connection = connectionFactorycreateConnection();
connectionstart();
Session session = connectioncreateSession(false, SessionCLIENT_ACKNOWLEDGE);
// 创建目标,就创建主题也可以创建队列
Destination destination = sessioncreateQueue("1230");
// 创建消息消费者
MessageProducer producer = sessioncreateProducer(destination);
producersetDeliveryMode(DeliveryModePERSISTENT);
String selector = "JMSCorrelationID='" + "1001" + "'";
MessageConsumer consumer = sessioncreateConsumer(destination,selector);
TextMessage tm = sessioncreateTextMessage("12345111");
tmsetJMSCorrelationID("1001");
producersend(tm);
while(true){
TextMessage tm1 = (TextMessage) consumerreceive(1000);
if(null!=tm1){
Systemoutprintln(tm1);
tm1acknowledge();
Systemoutprintln(tm1getText());
}
else
break;
}
consumerclose();
producerclose();
sessionclose();
connectionclose();
}
}

RabbitMQ是2007年发布,是一个在AMQP(高级消息队列协议)基础上完成的,简称MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法,由Erlang(专门针对于大数据高并发的语言)语言开发,可复用的企业消息系统,是当前最主流的消息中间件之一,具有可靠性、灵活的路由、消息集群简单、队列高可用、多种协议的支持、管理界面、跟踪机制以及插件机制。



1消息 就是数据,增删改查的数据。例如在员工管理系统中增删改查的数据

2队列 指的是一端进数据一端出数据,例如C#中(Queue数据结构)



1消息队列指:一端进消息,一端出消息

2RabbitMQ就是实现了消息队列概念的一个组件,以面向对象的思想去理解,消息队列就是类,而RabbitMQ就是实例,当然不仅仅只有RabbitMQ,例如ActiveMQ,RocketMQ,Kafka,包括Redis也可以实现消息队列。



1在常见的单体架构中,主要流程是用户UI *** 作发起>是一种基于代理的发布/订阅的消息协议。与请求/回答这种同步模式不同,发布/订阅模式解耦了发布消息的客户(发布者)与订阅消息的客户(订阅者)之间的关系,这意味着发布者和订阅者之间并不需要直接建立联系。一个发布者可以对应多个订阅者,当发布者发生变化的时候,他可以将消息一一通知给所有的订阅者。这种模式提供了更大的网络扩展性和更动态的网络拓扑此外运用MQTT协议,设备可以很方便地连接到物联网云服务,管理设备并处理数据,最后应用到各种业务场景。
2 、服务质量
MQTT提供三种质量的服务:
至多一次(qos = 0),可能会出现丢包的现象。使用在对实时性要求不高的情况。这一级别可应用于如下情景,如环境传感器数据,丢失一次读记录无所谓,因为很快下一次读记录就会产生。
至少一次(qos = 1),保证包会到达目的地,但是可能出现重包。
正好一次(qos = 2),保证包会到达目的地,且不会出现重包的现象。这一级别可用于如计费系统等场景,在计费系统中,消息丢失或重复可能会导致生成错误的费用。
3 、主题与通配符
主题名称(Topic name)用来标识已发布消息的信息的渠道。订阅者用它来确定接收到所关心的信息。它是一个分层的结构,用斜线“/”作为分隔符(这个有点类似于restful风格)。主题还可以通过通配符进行过滤。其中,+可以过滤一个层级,而#只能出现在主题最后表示过滤任意级别的层级。值得注意的是MQTT允许使用通配符订阅主题,但是并不允许使用通配符广播。
4、 遗嘱
当一个客户端断开连接的时候,它希望客户端可以发送它指定的消息。该消息和普通消息的结构相同。通过设置该位并填入和信息相关的内容即可(后面会有介绍)。
5、 消息类型
消息类型 类型 编码说明
reserved 0 保留
connect 1 客户端到服务端的连接请求
connACK 2 服务端对连接请求的响应
publish 3 发布消息
puback 4 新发布消息确认,是QoS 1给PUBLISH消息的回复
pubRec 5 QoS 2消息流的第一部分,表示消息发布已记录
pubRel 6 QoS 2消息流的第二部分,表示消息发布已释放
pubComp 7 QoS 2消息流的第三部分,表示消息发布完
subscribe 8 客户端订阅某个主题
subBack 9 对于SUBSCRIBE消息的确认
unsubscribe 10 客户端终止订阅的消息
unsubBack 11 对于UNSUBSCRIBE消息的确认
pingReq 12 心跳
pingResp 13 确认心跳
disconnect 14 客户端终止连接前通知MQTT代理
reserved 15 保留
二、MQTT 服务端(实现MQTT协议的中间件)
MQTT协议有很多开源的broker可以用,这里我整理了两个,分别是Apollo和mosquito;详细的服务端选择可以去上面的服务端链接中选。以下所讲的环境配置和服务器搭建都是在MAC环境上,其他平台的可以参见选用broker的详细文档。
1、Apollo-Broker搭建
Apache Apollo是一个代理服务器,其是在ActiveMQ基础上发展而来的,可以支持多种协议,如:STOMP、AMQP、MQTT、SSL等。


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zz/13420873.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-08-02
下一篇 2023-08-02

发表评论

登录后才能评论

评论列表(0条)

保存