RabbitMq系列之三:web管理端

RabbitMq系列之三:web管理端,第1张

各组件解释如下:

AMQP 消息的路由中增加了 Exchange 和 Binding 的角色。生产者把消息发布到 Exchange 上,消息最终到达队列并被消费者接收,而 Binding 决定交换器的消息应该发送到那个队列。

Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由键,此外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了,所以直接看另外三种类型:

消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为"dog",则只转发 routing key 标记为"dog"的消息,不会转发"dogpuppy",也不会转发"dogguard"等等。它是完全匹配、单播的模式。

每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息是最快的。

topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:符号"#"和符号" "。

"#"匹配0个或多个单词," "匹配不多不少一个单词。

RabbbitMQ 的分发机制非常适合扩展,而且它是专门为并发程序设计的,如果现在 load 加重,那么只需要创建更多的 Consumer 来进行任务处理。

在实际应用中,可能会发生消费者收到 Queue 中的消息,但没有处理完成就宕机(或出现其他意外)的情况,这种情况下就可能会导致消息丢失。为了避免这种情况发生,我们可以要求消费者在消费完消息后发送一个回执给 RabbitMQ,RabbitMQ 收到消息回执(Message acknowledgment)后才将该消息从 Queue 中移除;如果 RabbitMQ 没有收到回执并检测到消费者的 RabbitMQ 连接断开,则 RabbitMQ 会将该消息发送给其他消费者(如果存在多个消费者)进行处理。这里不存在 timeout 概念,一个消费者处理消息时间再长也不会导致该消息被发送给其他消费者,除非它的 RabbitMQ 连接断开。 这里会产生另外一个问题,如果我们的开发人员在处理完业务逻辑后,忘记发送回执给 RabbitMQ,这将会导致严重的 bug——Queue 中堆积的消息会越来越多;消费者重启后会重复消费这些消息并重复执行业务逻辑。

另外 pub message 是没有 ack 的。

如果我们希望即使在 RabbitMQ 服务重启的情况下,也不会丢失消息,我们可以将 Queue 与 Message 都设置为可持久化的(durable),这样可以保证绝大部分情况下我们的 RabbitMQ 消息不会丢失。但依然解决不了小概率丢失事件的发生(比如 RabbitMQ 服务器已经接收到生产者的消息,但还没来得及持久化该消息时 RabbitMQ 服务器就断电了),如果我们需要对这种小概率事件也要管理起来,那么我们要用到事务。由于这里仅为 RabbitMQ 的简单介绍,所以这里将不讲解 RabbitMQ 相关的事务。

要持久化队列 queue 的持久化需要在声明时指定 durable=True;

这里要注意,队列的名字一定要是 Broker 中不存在的,不然不能改变此队列的任何属性

队列和交换机有一个创建时候指定的标志 durable,durable 的唯一含义就是具有这个标志的队列和交换机会在重启之后重新建立,它不表示说在队列中的消息会在重启后恢复。

消息持久化包括 3 部分

如果 exchange 和 queue 都是持久化的,那么它们之间的 binding 也是持久化的,如果 exchange 和 queue 两者之间有一个持久化,一个非持久化,则不允许建立绑定

注意:一旦创建了队列和交换机,就不能修改其标志了,例如,创建了一个 non-durable 的队列,然后想把它改变成 durable 的,唯一的办法就是删除这个队列然后重现创建。

你可能也注意到了,分发机制不是那么优雅,默认状态下,RabbitMQ 将第 n 个 Message 分发给第 n 个 Consumer。n 是取余后的,它不管 Consumer 是否还有 unacked Message,只是按照这个默认的机制进行分发

那么如果有个 Consumer 工作比较重,那么就会导致有的 Consumer 基本没事可做,有的 Consumer 却毫无休息的机会,那么,Rabbit 是如何处理这种问题呢?

RabbitMQ 使用 ProtoBuf 序列化消息,它可作为 RabbitMQ 的 Message 的数据格式进行传输,由于是结构化的数据,这样就极大的方便了 Consumer 的数据高效处理,当然也可以使用 XML,与 XML 相比,ProtoBuf 有以下优势:

    生产者创建消息,然后发布到 RabbitMQ 。消息一般可以包含两个部分:消息体和标签

    消息体也可以称之为 payload ,在实际应用中,消息体一般是一个带有业务逻辑结构的数据,比如一个 JSON 字符串。当然可以进一步对这个消息体进行序列化 *** 作。消息的标签用来表述这条消息,比如一个交换器的名称和一个路由键。生产者把消息交由 RabbitMQ,RabbitMQ 之后会根据标签把消息发送给感兴趣的消费者( Consumer )。

    消费者连接到 RabbitMQ 服务器,并订阅到队列上。当消费者消费这条消息时,只是消费消息的消息体( payload )。在消息路由的过程中,消息的标签会丢弃,存入到队列中的消息只有消息体,消费者也只会消费到消息体,也就不知道消息的生产者是谁,当然消费者也不需要知道。

    对于 RabbitMQ 来说, RabbitMQ Broker 可以简单地看作一个 RabbitMQ 服务节点

    多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊(Round-Robin ,即轮询)

给多个消费者进行处理,而不是每个消费者都收到所有的消息井处理,RabbitMQ 不支持队列层面的广播消费。

    真实情况是,生产者将消息发送到 Exchange (交换器,通常也

可以用大写的“X”来表示),由交换器将消息路由到一个或者多个队列中。如果路由不到,或

许会返回给生产者,或许直接丢弃。

    生产者将消息发给交换器的时候, 一般会指定一个RoutingKey ,用来指定这个消息的路由规则,而这个 Routing Key 需要与交换器类型和绑定键( BindingKey )联合使用才能最终生效。

    RabbitMQ 中通过绑定将交换器与队列关联起来,在绑定的时候一般会指定一个绑定键(Binding Key)。

    生产者将消息发送给交换器时, 需要一个RoutingKey,BindingKey和RoutingKey 相匹配的时候,消息会被路由到对应的队列中。在绑定多个队列到同一个交换器的时候,这些绑定允许使用相同的BindingKey。BindingKey 并不是在所有的情况下都生效,它依赖于交换器类型,比如fanout类型的交换器就会无视BindingKey,而是将消息路由到所有绑定到该交换器的队列中。

    无论是生产者还是消费者,都需要和RabbitMQ Broker建立连接,这个连接就是一条TCP连接,也就是Connection。一旦TCP连接建立起来,客户端可以紧接着创建一个AMQP信道(Channel)。

    信道是建立在 Connection 之上的虚拟连接,

我们完全可以直接使用 Connection 就能完成信道的工作,为什么还要引入信道呢?

Connection 是RabbitMQ的socket连接,风住哪个了socket协议相关部分逻辑。

ConnectionFactory 是Connection的工厂。

Channel 是RabbitMQ提供的非常重要的接口,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等 *** 作都是其中完成的。

客户端与RabbitMQ连接时,首先要建立和RabbitMQ代理之间的tcp连接。一旦tcp打开,你的应用会创建一个 AMQP channel。这个channel是在“真实”的tcp连接里面的一个“虚拟”的连接,你可以通过channel发送amqp命令。每个channel都有个唯一的ID。如果通过tcp发送amqp命令,则会消耗大量的tcp连接(tcp连接是有限的)。当创建一个线程的时候,会在tcp连接上创建一个channel,这个线程拥有私有的与RabbitMQ沟通的路径,并且不会在系统的tcp栈上添加额外的负荷。一个tcp连接上面可以建立的amqp channel数量是没有限制的。

如果有一个消息生产者或者消息消费者通过amqp-client的客户端连接至节点1进行消息的发布或者订阅,那么此时的集群中的消息收发只与节点1相关。

如果消息生产者所连接的是节点2或者节点3,此时队列1的完整数据不在该两个节点上,那么在发送消息过程中这两个节点主要起了一个路由转发作用,根据这两个节点上的元数据转发至节点1上,最终发送的消息还是会存储至节点1的队列1上。

RabbitMQ 集群是一个或多个节点的逻辑分组,每个节点共享用户、虚拟主机、队列、交换器、绑定、运行时参数和其他分布式状态。

一些分布式系统 有leader和follower节点。 对于 RabbitMQ 来说, RabbitMQ集群中的所有节点都是平等的。

RabbitMQ 集群可以通过多种方式组成:

RabbitMQ 节点绑定到端口以接受客户端和 CLI 工具连接。其他进程和工具(例如 SELinux)可能会阻止 RabbitMQ 绑定到端口。发生这种情况时,节点将无法启动。

CLI工具、客户端库和 RabbitMQ 节点也会打开连接(客户端 TCP 套接字)。防火墙会阻止节点和 CLI 工具相互通信。确保可以访问以下端口:

RabbitMQ节点和 CLI 工具(例如 rabbitmqctl)使用 cookie 来确定它们是否被允许相互通信。为了让两个节点能够通信,它们必须具有相同的共享密钥,称为 Erlang cookie。cookie 只是一串最多 255 个字符的字母数字字符。 它通常存储在本地文件中。该文件必须只能由所有者访问(例如具有 600 或类似的 UNIX 权限)。每个集群节点必须具有相同的 cookie。

在 UNIX 系统上,cookie通常是位于/var/lib/rabbitmq/erlangcookie(由服务器使用)和$HOME/erlangcookie(由 CLI 工具使用)。

RabbitMQ 节点使用主机名相互寻址

<!== 所有主机执行 ==>

<!== 所有主机执行 ==>

<!== 所有主机执行 ==>

默认配置文件/usr/lib/rabbitmq/lib/rabbitmq_server-3717/ebin/rabbitapp

<!== node01主机执行 ==>

<!== node02主机执行 ==>

<!== node03主机执行 ==>

<!== 所有主机执行 ==>

因RabbitMQ 集群元数据同步基于 cookie 共享方案实现

文件路径/var/lib/rabbitmq/erlangcookie

<!== node02、node03主机执行 ==>

<!== 任意主机执行 ==>

节点分为:磁盘节点及内存节点

RAM节点是一种特殊情况,可用于提高具有高队列、交换或绑定搅动的集群的性能。RAM节点不提供更高的消息速率。 官方建议在绝大多数情况下,仅使用磁盘节点。

如果一个集群中都是RAM节点,那么集群停止,将无法再次启动,并将丢失所有数据

官方提示:经典队列镜像将在未来版本中删除,考虑改用仲裁队列或非复制经典队列

每个镜像队列由一个领导副本和一个或多个镜像副本,leader被托管的节点成为leader节点。首先应用给定队列的所有 *** 作 在队列的leader节点上,然后传播到镜像。

如果承载队列的leader节点出现故障,则只要已同步,最旧的镜像将升级为新的leader。根据队列镜像参数,也可以升级未同步的镜像。

队列通过策略启用了镜像,策略模式的说明如下:

每当队列的策略发生变化时,它都保持其现有的镜像尽可能适用新策略。

设置的镜像队列可以通过开启的网页的管理端Admin->Policies,也可以通过命令。

管理端界面:

命令行:

为避免集群中的某个节点托管大多数leader队列,因此导致高负载,leader队列应合理均匀的分布在集群节点上。控制leader队列分布节点策略有三种,可以在rabbitmqconf文件中定义queue_master_locator参数设置

修改节点策略可能会导致现有的leader队列没有在新的策略中,为了防止消息丢失,RabbitMQ会保留现有的leader直到至少另一个镜像已同步,一旦发生同步,消费者将与leader断开连接,需要重新连接。

如果leader故障,其他镜像升级为leader过程如下:

如果消费者使用 自动确认模式 ,那么消息可能会丢失。这与非镜像队列没有什么不同:代理认为消息一旦以自动确认模式发送给消费者,就会被确认。

如果客户端突然断开连接,则可能永远不会收到消息。在镜像队列的情况下,如果leader死亡,那些正在以 自动确认模式 发送给消费者的消息可能永远不会被这些客户端接收,并且不会被新leader重新排队。由于消费客户端连接到存活节点的可能性,消费者取消通知有助于识别此类事件何时发生。 当然,在实践中,如果数据安全性不如吞吐量重要,那么自动确认模式是可行的方法。

节点可以随时加入集群。 根据队列的配置,当节点加入集群时,队列可能会在新节点上添加镜像。 此时,新镜像将是空的:它不会包含队列中任何现有的内容。 这样的镜像将接收发布到队列的新消息,因此随着时间的推移将准确地表示镜像队列的尾部。 随着消息从镜像队列中排出,新镜像丢失消息的队列头部的大小将缩小,直到最终镜像的内容与leader的内容完全匹配。 在这一点上,镜像可以被认为是完全同步的。

新添加的镜像不提供添加镜像之前存在的队列内容的额外形式的冗余或可用性,除非队列已明确同步。 由于在发生明确同步时队列变得无响应,因此最好允许正在从中排出消息的活动队列自然同步,并且仅明确同步非活动队列。

启用自动队列镜像时,请考虑所涉及队列的预期磁盘数据集 。 具有庞大数据集(例如,数十 GB 或更多)的队列必须将其复制到新添加的镜像中,这会给集群资源(例如网络带宽和磁盘 I/O)带来很大的负载。

要查看镜像状态(是否同步):

手动同步队列:

取消正在进行的同步:

如果你停止一个包含镜像队列leader的 RabbitMQ 节点,其他节点上的一些镜像将被提升为leader。 如果继续停止节点,那么将到达一个镜像队列不再有镜像的点:它仅存在于一个节点上,且该节点上它是leader。 如果它的最后一个剩余节点关闭,但是镜像队列被声明为持久的,那么队列中的持久消息将在该节点重新启动后继续存在。

然而,镜像目前无法知道它的队列内容是否已经偏离了它重新加入的leader。 因此,当一个镜像重新加入一个镜像队列时, 它会丢弃已经拥有的任何持久本地内容并开始为空

默认情况下,RabbitMQ 将拒绝leader节点在受控关闭(即明确停止 RabbitMQ 服务或关闭 *** 作系统)时提升非同步镜像,以避免消息丢失; 相反,整个队列将关闭,就好像未同步的镜像不存在一样。

leader节点不受控制的关闭(即服务器或节点崩溃,或网络中断)仍将触发未同步镜像的提升。

如果您希望在所有情况下都让leader队列移动到未同步的镜像(即,您会选择队列的可用性而不是避免由于未同步的镜像升级而导致的消息丢失),那么将 ha-promote-on-shutdown 策略键设置为 always 而不是比它的默认值 when-synced 。

如果 ha-promote-on-failure 策略键设置为 when-synced ,则即使 ha-promote-on-shutdown 键设置为 always ,也不会提升未同步的镜像。 这意味着如果leader节点发生故障,队列将变得不可用,直到leader恢复。 如果leader队列永久丢失,队列将不可用,除非它被删除(这也将删除其所有内容)并重新声明。

当队列的所有镜像都关闭时,可能会丢失队列的leader。 在正常 *** 作中,队列关闭的最后一个节点将成为leader,该节点在再次启动时仍然是leader(因为它可能收到了其他镜像没有看到的消息)。

但是,当您调用 rabbitmqctl forget_cluster_node 时,RabbitMQ 将尝试为每个在我们忘记的节点上有其领导者的队列找到当前停止的镜像,并在它再次启动时“提升”该镜像成为新的领导者。 如果有多个候选者,将选择最近停止的镜像。

重要的是要理解 RabbitMQ 只能在 forget_cluster_node 期间提升停止的镜像,因为任何再次启动的镜像都会清除它们的内容,如上面“停止节点和同步”中所述。 因此在停止的集群中移除丢失的leader时,您必须在再次启动镜像之前调用 rabbitmqctl forget_cluster_node 。

客户端通过 TCP 连接到 RabbitMQ Server。

连接成功后 RabbitMQ 会创建一个 AMQP 信道。

信道是创建在 TCP 上的虚拟连接,AMQP 命令都是通过信道发送出去的,每个信道都会有一个唯一的 ID,不论是发布消息,订阅队列或者介绍消息都是通过信道完成的。

设置如下:

只做队列持久化是不行的还需要在加上消息持久化

消息生产者

消息消费者

:在修改消费者的 routing_key 后,需要重新创建队列。

消息生产者

在生产者中只需要修改 exchange_declare 他的 type 为 fanout 即可

消息消费者

例如:

接收者(消费者) routing_key='a'

发送者(生产者) routing_key = 'abcd'

结果:匹配失败

因为:

表示匹配一个单词

# 表示匹配0个或多个单词

消息生产者

消息消费者

消息生产者

消息消费者

三个任务,任务1、任务2、任务3。他们之间有相互的制约。执行 任务1 的前提是要有 任务2 的结果。执行 任务2 需要 任务3 的结果。

一般会使用 crontab 来做计划任务。预估一下每个任务的完成时间。然后制定任务。当数量变大处理时间变成,就需要经常修改 crontab 任务。

使用 RabbitMQ 后每个任务结束时只需要发送一个结束信息即可

如: 任务2 订阅 任务3 的信息。当 任务3 完成后发送一个完成消息。 任务2 接收到完成消息后开始执行,在执行结束后发送 任务2 完成消息。 任务1 订阅 任务2 的消息,然后执行。

有三个用户A、B、C 他们分别发送文章,但后台会根据用户的级别做不同的 *** 作。

A 是普通用户,系统发布。

B 是 VIP 用户,系统发布和推荐给关注这部分内容的客户。

C 是黑卡客户,系统发布、推荐给关注这部分内容的客户、在这个分类中置顶这篇文章。

文章发布服务只关心是否成功,剩下的 *** 作都不关心。可以使用 RabbitMQ 服务将其他 *** 作分离。

有一个 *** 作 A。用户执行这个任务后,又非常需要结果。

为什么不是后台直接将结果发送给用户呢。因为一旦增加了订阅用户,就需要修改后台程序,这样很恶心。

RabbitMQ 自带管理后台,安装后需要配置开启

进入 RabbitMQ 安装目录中的 sbin 目录执行

>

MQ全程为message queue,即消息队列。是一种跨进程、异步通信机制、用于上下游传递消息。RabbitMQ是由Erlang语言开发,基于 AMQP 协议(Advanced Message Queuing Protocol 高级消息队列协议)实现的消息队列,它是一种应用程序之间的通信方法,消息队列在实际开发应用中有着非常广泛的使用。下面主要介绍RabbitMQ的基础、架构以及在开发过程中遇到的一些常用问题,还有在面试过程中一些长问的问题。

RabbitMQ官网 : >

以上就是关于RabbitMq系列之三:web管理端全部的内容,包括:RabbitMq系列之三:web管理端、消息队列之RabbitMQ-分布式部署、RabbitMQ - 介绍等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/web/9771126.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-01
下一篇 2023-05-01

发表评论

登录后才能评论

评论列表(0条)

保存