目录《Kafka篇》 简述kafka的架构设计原理(入口点)
- 《Kafka篇》
- 简述kafka的架构设计原理(入口点)
- 消息队列有哪些作用(简单)
- 消息队列的优缺点,使用场景(基础)
- 消息队列如何保证消息可靠传输
- 死信队列是什么?延时队列是什么?(经典)
- 简述kafka的rebalance机制(比较深入)
- 简述kafka的副本同步机制(比较深入)
- kafka中zookeeper的作用
- kafka中的pull、push的优劣势分析
- kafka中高读写性能原因分析
- kafka高性能高吞吐的原因
- kafka消息丢失的场景以及解决方案(重点)
- kafka为什么比RocketMQ的吞吐量高
- kafka、ActiveMQ、RabbitMQ、RocketMQ对比
- 《RabbitMQ篇》
- RabbitMQ架构设计
- RabbitMQ的交换器类型
- RabbitMQ的普通集群模式
- RabbitMQ的镜像队列原理
- RabbitMQ持久化机制
- RabbitMQ事务消息
- RabbitMQ如何保证消息的可靠性传输
- RabbitMQ的死信队列原理
- RabbitMQ是否可以直连队列
- 《RocketMQ篇》
- 简述RocketMQ架构设计
- 简述RocketMQ持久化机制
- RocketMQ怎么实现顺序消息
- RocketMQ的底层实现原理
- RocketMQ如何保证不丢失消息
- 《MQ总结篇》
- 如何设计一个MQ
- 如何进行产品选型
- 如何保证消息的顺序
无论是那种MQ都会存在三个:producer、MQ的cluster、consumer的group
kafka中还多出了zookeeper,用来维护集群的。
注意分区是将一个整体分割到不同的分区上,主从则是都保留数据整体,不过是主与副本的关系。
Broker:单独的机器
Consumer Group:消费者组,消费者组内每个消费者负责消费不同分区的数据,提高消费能力。逻辑上的一个订阅者
Topic:可以理解为一个队列,Topic将消息分类,生产者和消费者面向的是同一个Topic,它是可以分区的,存在不同的Broker中
Partition:为了实现扩展性,提高并发能力。一个Topic以多个Partition的方式分布到多个Broker上,每个Partition是一个有序的队列。一个Topic的每个Partition都有若干个副本,一个Leader和若干个Follower。生产者发送数据的对象以及消费者消费的数据对象都是Leader。(这一点可以从图中看出,红色的虚线便是如此)Follower负责实时从Leader中同步数据,保持和Leader数据同步。Leader发生故障时,某个Follower会被重新选举为新的Leader。
一个Topic是一个消息主题,是一个逻辑概念。Partition也是逻辑概念。Topic1内部有两个分区:P1、P2,P1有主从、P2也有主从,都是可以设置的。
如果某个Partition设置的主从数小于Broker数,那么不会是每个Broker机子上都有副本。
如果设置的主从数大于Broker数,那么多余的Partition会冗余再Broker中。
接下来看消费者组。组1里面有C1 C2,组2里面有C3 C4。
组1只消费Topic1,组2只消费Topic2
C1消费Topic1的P0。C2消费Topic1的P1,消费的都是Leader节点。这是正常模式。
下面是非正常模式:
C3 C4都消费的是Topic2的P0,此时C3 与C4会形成互斥。当业务高峰期时,MQ中消息堆积过多,可以增加group中的消费者实例,加速消费。
zookeeper则时负责维护broker,与broker维持心跳,哪个broker宕机了,zookeeper都是可以感知到的。
并且生产者与消费者需要锁定分区的Leader,这个信息可以到zookeeper中去取。
1、解耦:使用消息队列来作为两个系统直接的通讯方式,两个系统不需要相互依赖了
2、异步:系统A给消费队列发送完消息之后,就可以继续做其他事情了
3、流量削峰:如果使用消息队列的方式来调用某个系统,那么消息将在队列中排队,由消费者自己控制消费速度。将流量从高峰期引入到低谷期进行处理,起到缓冲作用
优点:
1、解耦:使用消息队列来作为两个系统直接的通讯方式,两个系统不需要相互依赖了
2、异步:系统A给消费队列发送完消息之后,就可以继续做其他事情了
3、流量削峰:如果使用消息队列的方式来调用某个系统,那么消息将在队列中排队,由消费者自己控制消费速度
缺点:
1、增加了系统复杂度,加上了与MQ交互的逻辑,带入了幂等、重复消费、消息丢失等问题
2、系统可用性降低,MQ的故障会影响系统可用
3、一致性,消费端可能失败。A端将消息送入MQ后就不知道B端对消息处理是否成功。
使用场景:
日志采集:日志量较大时不希望影响到正常的业务,使用MQ异步传送出去,允许小部分的重复记录、记录消失
发布订阅:类似与监听,对感兴趣的消费MQ中的消息
消息可靠传输代表两层意思:不多也不少
1、为了保证消息不多,也就是消息不能重复,也就是生产者不能重复生产消息,或者消费者不能重复消费消息:
- 要确保消息不多发,这个不容易出现,难以控制
- 从MQ本身来看,尽管有ack或offset的机制,在网络不好或者消费者宕机时,这些标志上传会失败。所以MQ也不能保证正确感知消息是否被消费
- 要避免不重复消费,最保险机制就是消费者实现幂等性,保证就算是重复消费,也不会出现问题。具体来讲,就是不管是MQ push消息还是消费者pull消息都要保证。幂等的概念就是用相同的参数请求C端,处理结果不会因为次数的增加而改变。这边提供三个方案:
1、如果是写redis,就没问题,每次都是set,天然幂等性。但是键值对的超时时间会随着刷set而往后延。
2、生产者发送消息的时候带上一个全局唯一的id,消费者拿到消息后,先根据这个id去redis里查一下,之前有没有被消费过,没有消费过就处理,并且写入这个id到redis。如果消费国了,则不处理
3、基于数据库的唯一键,主键唯一的话,重复的记录就不会被插入
2、消息不能少,也就是消息不能丢失,生产者发送的消息,消费者一定要能消费到:
- 生产者发送消息时,要确认broker确实收到并持久化了这条消息,比如RabbitMQ的/confirm/i机制,Kafka的ack机制都可以保证生产者能正确的将消息发送给broker
- broker要等待消费者真正确认消费到了消息时才删除掉消息,这里通常就是消费端ack机制,消费者接收到一条消息后,如果确认没问题了,就可以给broker发送一个ack,broker接收到ack后才会删除消息
1、死信队列也是一个消费队列,用来存放那些没有成功消费的消息,(重试之后还是失败则进入死信队列),可以用来作为消息重试
2、进入到这个队列中的消息,需要等待设置的时间之后才能被消费者消费到,延时队列就是用来存放需要在指定时间被处理的元素的队列,通常可以用来处理一些具有过期性 *** 作的业务,如十分钟内未支付就取消订单
该机制会影响kafka的读写性能,在rebalance时,读写会进入阻塞,直到rebalance完成,所以需要尽量避免rebalance。
rebalance指的是consumer group(消费者组)中的消费者和topic下的partion(分区)重新匹配的过程
假设组里面有3个消费者,topic将C1 C2 C3其分区到P1 P2 P3中,进行对应消费
如果C1宕掉了,意味着P1没有消费者来消费了,此时就会进行rebalance 。
又或者,此时group中多了C4 C5 C6,那么此时最好是将它们均分三个Partion,也就是说P1的消息只会发往C1 或者C4,此时也需要进行rebalance。也就是说,新加入节点,需要将分区数与消费者数目进行重新计算匹配。
总结一下何时会产生rebalance
- 1、consumer group 中的成员个数发生变化
- 2、consumer 消费超时,一直没有提交offset
- 3、group订阅的topic个数发生变化
- 4、group订阅的topic的分区数发生变化
所以对应减少rebalance的方法有:
1、超时阈值调大
2、在业务低峰期的时候人工增加topic和partion
那么rebalance具体是什么样的 *** 作呢,下面介绍coordinator发现 group 中的成员个数发生变化主动进行rebalance的 *** 作过程:
coordinator:通常是partion的leader节点(一个partion是有多个副本的,存在leader与follower节点)所在的broker,负责监控group中的consumer的存活,consumer维持到coordinator的心跳(消费者定时向协调者上报心跳),判断consumer是否消费超时
- coordinator通过心跳返回通知consumer进行rebalance。举例,一个group中有C1 C2 C3,此时C1挂了,要进行rebalance,协调者也需要通知C1 C2不能进行消费,由于消费者与协调者之间是通过心跳通信,协调者通过回复心跳,通知消费者进入rebalance状态
- consumer请求coordinator加入group,coordinator会知道有哪些消费者请求它加入group,也就知道了group中有哪些消费者是存活的,coordinator就会选举产生leader consumer
- leader consumer从coordinator获取所有的consumer,然后将partion与所有的consumer进行分配,然后将分配结果封装成syncGroup,发送syncGroup(分配信息)到coordinato
- coordinator拿到分配信息后,通过心跳机制将分配信息下发给consumer,consumer拿到分配信息后就知道它该去消费哪个partion了
- 至此,完成rebalance
还有一种情况,就是leader consumer 监控topic or partion的变化,通知coordinator触发rebalance,之后的流程与上述一致。
rebalance存在的问题:如果C1消费消息超时(并没有提交offset),触发了rebalance,重新分配后,该消息极有可能会被其他消费者C2拿去消费,此时C1消费完成提交offset(表示该消息已经处理完了),那么C2消费完之后也会提交一个offset,导致错误
解决方案如下:coordinator每次rebalance,会标记一个Generation(表示rebalance的周期数)给到consumer,每次rebalance该Generation会+1,consumer提交offset的时候,coordinator会比对Generation,不一致则拒绝提交
之前有提到partion有leader与follower机制的存在,follower节点可能存在多份。
leader负责处理读写请求,follower不处理客户端请求,只负责从leader那边拉取数据,可以理解为主备模式。当leader挂掉之后,由follower进行选举,follower唯一的功能就只是数据同步。
先看看日志在partition中是如何存储的:
kafka的消息是基于append的顺序追加,所以partition中消息也是有顺序的,可以通过offset来确定消息在partition中的具体位置
下面是消息队列的组成结构:
顺序是从下往上开始
LEO:下一条待写入位置
firstUnstableOffset:第一条未提交数据
LastStableOffset:最后一条已提交数据
LogStartOffset:起始位置
当isolation.level = read_committed,意思是只能够都已提交数据:只能消费到LastStableOffset
当isolation.level = read_uncommitted,意思是能够读到已提交和未提交数据,即可以消费到HW的上一条消息
正常情况下HW应该和LEO位置重合,如果是read_uncommitted的话。但是由于存在ISR机制。
举例partition中有1个Leader和6个followers(f1 ~ f2),ISR只维护6个副本中与Leader中一致的信息,若follower中只有f2 ~ f3与leader消息一致,那么ISR中只保存(f2、f3、Leader)的HW。消费者来消费时,不取决于f2、f3、Leader的HW,而是取决于其中最小的HW,即分区的HW = min(follower.HW, Leader.HW)
一个partition对应的ISR中最小的LEO作为分区的HW,consumer最多只能消费到HW所在的位置
leader收消息后(offset肯定要移动)会更新本地的LEO,leader还会维护follower的LEO即remote_LEO。follower会发出一个fetch同步数据的请求(携带自身的LEO)给leader,leader就知道了ISR列表中所有follower的remote_LEO,然后比较得出最小的remote_LEO,然后作为分区的HW,然后进行更新,再把HW数据响应给follower ,follower拿到HW之后更新自身的HW(取响应的HW和自身LEO中的较小值),然后进行数据落盘,然后LEO+1。所以总的来说follower是异步的形式进行更新HW
ISR:如果一个follower落后leader不超过某个时间阈值,那么则在ISR,否则放在OSR中。
在同步副本的时候,follower获取leader的LEO和LogStartOffset,与本地对比。如果本地的LogStartOffset超出了leader的值,则超过这个值的数据删除,再进行同步,如果本地的小于leader的,那么直接同步。
注意,同步的时候可能会导致消息丢失,leader接受到消息更新完本地后,LEO还没相应给follower的时候,leader自己就挂掉了。然后重启之后原leader就变成follower了(重新选举了),那么它再去向新leader同步的时候就会把原本本地没有同步出去的消息给删除,也代表着这个消息就丢失了。
zookeeper负责的是集群的管理功能,后面的迭代中zk已经不再了。
看看zk在kafka中存储了哪些节点信息吧:
/brokers/ids:临时节点,kafka连接到zk后创建的节点,保存所有broker节点信息,存储broker的物理地址、版本信息、启动时间等,节点名称为brokerID,broker定时发送心跳到zk,如果断开则该brokerID节点就会被删除。
/brokers/topics:临时节点,节点保存broker节点下所有的topic信息,每一个topic节点下包含了一个固定的partitions节点(/brokers/topics/partitions),partitions的子节点就是topic的分区,每个分区下保存一个state节点,保存着当前leader分区和ISR(可靠的从节点列表)的brokerID,state节点由leader创建,若leader宕机该节点会被删除,直到有新的leader选举产生、重新生成state节点
/consumer/[group_id]/owners/[topic]/[broker_id-partition_id]:维护消费者和分区的注册关系(哪个消费者消费哪个分区)
/consumer/[group_id]/offsets/[topic]/[broker_id-partition_id]:分区消息的消息进度offset
cilent通过topic找到topic树下的state节点,获取leader的brokerID,到broker树中找到brokerID的物理地址,但是cilent不会直接连着zk,而是通过配置的broker获取到zk中的信息。
kafka中的pull、push的优劣势分析pull模式:
- 根据consumer的消费能力进行数据拉取,可以控制速率
- 可以批量拉取,也可以单条拉取
- 可以设置不同的提交方式,实现不同的传输语义
缺点:如果kafka没有数据,会导致consumer空循环,消耗资源
解决:通过参数设置,consumer拉取数据为空或者没有达到一定数量时进行阻塞
push模式:
不会导致consumer循环等待。
缺点:速率固定,忽略了consumer的消费能力,可能导致拒绝服务或者网络拥塞等情况
原因两点:顺序写 + 零拷贝
kafka是一个文件系统,不基于内存,而是直接硬盘存储,因此消息堆积能力能强。
顺序写:利用磁盘的顺序访问速度可以接近内存,kafka的消息都是append *** 作,partition是有序的,节省了磁盘的寻道时间,同时通过批量 *** 作节省了写入次数,partition(逻辑概念)物理上分为多个segment文件存储,方便删除
传统:
- 读取磁盘文件数据到内核缓冲区
- 将内核缓冲区的数据copy到用户缓冲区
- 将用户缓冲区的数据copy到socket的发送缓冲区
- 将socket发送缓冲区中的数据发送到网卡、进行传输
零拷贝: - 直接将内核缓冲区的数据发送到网卡传输,节省了数据在内核态与用户态直接的传递
- 使用的是 *** 作系统的指令支持
kafka不太依赖jvm,主要是用的 *** 作系统的pageCache(页存,之后会刷新到磁盘中),如果生产消费速率相当,则直接用pageCache交换数据,不需要经过磁盘IO
1、磁盘顺序读写:保证了消息的堆积
- 顺序读写:磁盘会预读,即在读取的起始地址连续读取多个页面,主要时间花费在了传输时间,而这个时间两种读写可以认为是一样的
- 随机读写,因为数据没有在一起,预读将会浪费时间,需要多次寻道和旋转延迟,而这个时间可能是传输时间的许多倍
2、零拷贝:避免CPU将数据从一块存储拷贝到另外一块存储 - 传统的数据拷贝:
1、读取磁盘文件数据到内核缓冲区
2、将内核缓冲区的数据copy到用户缓冲区
3、将用户缓冲区的数据copy到socket的发送缓冲区
4、将socket发送缓冲区的数据发送到网卡,进行传输 - 零拷贝:
磁盘文件->内核空间读取缓冲区->网卡接口->消费者进程
3、分区分段 + 索引
kafka的message消息实际上是分布式存储在一个一个小的segment中的,每次文件 *** 作也是直接 *** 作的segment。为了进一步的查询优化,kafka又默认为分段后的数据文件建立了索引文件,就是文件系统上的,index文件。这种分区分段 + 索引的设计,不仅提升了数据读取的效率,同时也提高了数据 *** 作的并行度(有点类似与分段锁)
4、批量压缩:存储不是直接存储原文,而是多条消息一起压缩,降低带宽。消费端收到消息后再解压
5、批量读写
6、直接 *** 作的是pageCache,而不是JVM,避免GC耗时及对象创建耗时,且读写速度更高。进程重启缓存也不会丢失
1)消息发送时出现丢失的场景以及解决
1、ack = 0 ,不重试 生产者发送消息完不管结果了,如果发送失败,消息也就丢失了 2、ack = 1, leader 宕机了 生产者发送消息完,只等待leader写入成功就返回了,但是leader之后宕机了,自此follower还没来得及同步,消息丢失 3、unclean.leader.election.enable 配置true 允许选举ISR以外的副本作为leader,也会导致数据丢失,默认为false。生产者发送异步消息之后,只等待leader写入成功就返回了,然后leader宕机了,这时ISR中没有follower,leader会从OSR中选举,因为OSR中的follower节点本身就落后与leader,就会造成消息丢失 解决方案: 1、配置:ack = all / -1, tries > 1, unclear.leader.election.enable : false 生产者发送消息完,等待follower同步完再返回,如果异常则重试,副本的数量此时可能会影响吞吐量 不允许选举ISR以外的副本作为leader 2、配置:min.insync.replicas > 1,设置越大表示越可靠 副本指定必须确认写 *** 作成功的最小同步副本数量,如果不能满足这个最小值,则生产者将引发一个异常(要么是NotEnoughReplicas,要么是NotEnoughReplicasAfterAppend) min.insync.replicas和ack是有区别的, min.insync.replicas(同步副本数量)指的是ISR中的数量必须要大于1 ack = all / -1,表示ISR中的所有节点全部要确认 此间还存在一个隐性的逻辑关系,只有ack = all / -1,那么min.insync.replicas才会生效。 所以这两个参数要搭配着来使用,这样就可以确保如果大多数副本没有收到写 *** 作,则生产者将引起异常。 3、失败的offset单独记录 生产者发送消息,会自动重试,遇到不可恢复异常会抛出,这时可以捕获异常记录到数据库或缓存,进行单独处理
2)消费端
1、先commit offset再处理消息,如果再处理消息的时候出现异常了,但是offset已经提交了,这条消息对于该消费者来说就是丢失的,再也不会消费到了. 2、先处理消息,处理完了再commit,有可能存在重复消费的情况。在处理完这条消息之后,还没来得及commit,就宕机了,重启之后还回去消费这条消息。 解决方案: 先做业务处理,再去commit,如果出现重复消费,就只需要保证接口的幂等性就行了
3)broker端的刷盘
从生产者发送出来的消息实际上是缓存在broker的pageCache上的,然后linux保证pageCache上的数据被刷入硬盘中。如果linux此时宕机了,那么就会有部分pageCache上的数据丢失了。
于是可以通过配置参数,减少系统刷盘间隔
kafka的生产者采用的是异步发送消息机制,当发送一条消息时,消息并没有发送到broker节点上,而是先缓存起来,然后直接向业务返回成功,当缓存的消息积累到一定数量时再批量发送给broker。这种做法减少了网络io,从而提高了消息发送的吞吐量,但是如果消息生产者产生了宕机,会导致消息丢失,业务出错,所以理论上来说kafka利用此机制提高了性能却降低了可靠性。
kafka、ActiveMQ、RabbitMQ、RocketMQ对比站在应用的角度来看:
ActiveMQ:JMS规范,支持事务、支持XA协议,没有生产大规模支撑场景、官方维护越来越少
RabbitMQ:erlang语言开发、性能好、高并发,支持多种语言,社区、文档方面有优势,erlang语言不利于java二次开发,依赖开源社区的维护和升级,需要学习AMP协议,学习成本相对较高
以上吞吐量单机都在万级
kafka:高性能、高可用,生产环境有大规模使用场景,单机容量有限(超过64个分区响应明显变长)、社区更新慢
吞吐量单机百万
RocketMQ:java实现,方便二次开发,设计参考了kafka,高可用、高可靠,社区活跃度一般,支持语言较少。
吞吐量单机十万
connection:与MQ交互是通过connection,需要建立一个TCP连接,一个connection里面可以开多个信道(channel),这些信道会复用这个TCP连接 。
Broker:rabbitmq的服务节点
Queue:队列,是RabbitMQ的内部对象,用于存储消息。RabbitMQ中消息只能存储在队列中,生产者投递消息到队列,消费者从队列中获取消息并消费。多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊(轮询)给多个消费者进行消费,而不是每个消费者都收到所有的消息进行消费。(注意,RabbitMQ不支持队列层面的广播消费,如果需要广播消费,可以采用一个交换器通过路由Key绑定到多个队列,由多个消费者来订阅这些队列)
Exchange:交换器,生产者将消息发送到Exchange,由交换器将消息路由到一个或多个队列中。交换器与不同的队列通过绑定键绑定
RoutingKey:路由Key,生产者将消息发送给交换器的时候,一般会指定一个RoutingKey,用来指定这个消息的路由规则。这个路由Key需要与交换器类型和绑定键(BindingKey)联合使用才能最终生效(生产者指定的的RoutingKey会与BindingKey进行匹配,匹配规则与交换器类型有关)。在交换器类型和绑定键固定的情况下,生产者可以在发送消息给交换器时通过指定RoutingKey来决定消息流向哪里。
消息发送流程:
生产者将routeKey、exchangeName、body通过信道传递到broker里面,根据exchangeName找到交换机,用该交换机的匹配规则将routeKey匹配到现有的BindingKey,如果匹配上了,将消息投放到对应的queue里面。
消费流程:由Pull和Push两种方式
多个消费者消费同一个queue的话,queue里面的一条消息只会被一个消费者消费到
如果要发布订阅功能 ,生产者想要让多个消费者收到同一个消息,只需要通过交换器分发到多个queue上去即可。
vhost:虚拟主机的概念。一个broker其实就是一个物理主机,vhost其实就是虚拟主机,可以在一个broker上建立多个vhost。每个vhost都包含着自己的Exchange和Queue。应用可以指定其中一个虚拟机,所以一个rabbitmq可以给多个不同的应用使用,同时也是应用隔离的
交换器分发会先找出绑定的队列,然后再判断routekey,来决定是否将消息分发到某一个队列中
RabbitMQ的交换器类型决定了routeKey与BindingKey如何匹配,是精准匹配还是模糊匹配
有下面几种匹配规则:
fanout:扇形交换器,不再判断routekey,直接将消息分发到所有绑定的队列
direct:判断routekey的规则是完全匹配模式,即发送消息时指定的routekey要等于绑定的routekey
topic:判断routekey的规则是模糊匹配模式
header:绑定队列与交换器的时候指定一个键值对,当交换器在分发消息的时候胡先解开消息体里面的headers数据,然后判断里面是否有所设置的键值对,如果发现匹配成功,才将消息分发到队列中。性能较差
RabbitMQ的普通集群模式RabbitMQ中单个节点broker中存在着三个信息:
exchange:交换机,就是一张表,维护了路由键到queue的关系
queue:存放消息的容器
msg:消息内容
这里我们模拟集群中有3个节点,普通集群模式下,每个节点上存储的元数据是一样的。
元数据
- 队列元数据:队列名称和它的属性
- 交换器元数据:交换器名称、类型和属性
- 绑定元数据:一张简单的表展示了如何将消息路由到队列
- vhost元数据:就是一个broker,为vhost内的队列、交换器和绑定提供命名空间和安全属性
元数据每个节点都存了一份,是冗余的。消息的内容并没有每个节点都存,例如client1连节点1,那么queue1的消息内容只会存在节点1,不会同步到其他节点。所以某个节点宕机,就保证不了高可用。
同步元数据,这样每个节点都可以对外服务,想去消费其他queue时可以通过路由表去转发对应的请求。
为什么只同步元数据: - 存储空间考虑,每一个节点都保存全量数据会影响消息堆积能力
- 性能考虑,消息的发布者需要将消息复制到每一个集群节点
客户端连接的是非队列数据所在节点:则该节点会进行路由转发,包括发送和消费
集群节点类型: - 磁盘节点:将配置信息和元信息存储在磁盘上
- 内存节点:将配置信息和元信息存储在内存上,性能优于磁盘节点,依赖磁盘节点进行持久化
RabbitMQ要求集群中至少有一个磁盘节点,当节点加入和离开集群时,必须通知磁盘节点(如果集群中唯一的磁盘节点崩溃,则不能进行创建队列、创建交换器、创建绑定、添加用户、更改权限、添加和删除集群节点)。如果唯一磁盘的磁盘节点崩溃,集群是可以保持运行的,但是不能更改任何东西。因此建议在集群中设置两个磁盘节点,只要一个正常,系统就能正常工作。
基于集群模式才能设置镜像队列,要想实现高可用的话就必须使用集群+镜像队列的模式
整个队列,称为AMQPQueue包含四个部分:Queue、mirror_queue_master/slave、blockingQueue、GM
mirror_queue_master/slave负责消息的处理的进程, *** 作blockingQueue。blockingQueue是真正用来储存消息的
Queue负责AMQP协议(commit、rollback、ack等)
master负责处理读写,slave只做备份
GM负责消息的广播,所有的GM组成gm_group,形成链表结构,负责监听相邻节点的状态,以及传递消息到相邻节点(传给下一个节点,直到发送该消息的节点收到该消息,说明整个环路都走完了),master的GM收到消息时代表消息同步完成。
当master挂掉了,整个GM里面存在时间最长的slave(也意味着与master同步最多)将晋升为master。
GM不负责 *** 作blockingQueue,所以在接收到同步过来的消息时,会交由slave进程 *** 作
RabbitMQ持久化分为三个方面:
1、交换器持久化:exchange_declare创建交换器的时候通过参数指定
2、队列持久化:queue_declare创建队列时通过参数指定
3、消息持久化:new AMQPMessage创建消息时通过参数指定
持久化的时候是按照append的方式去写文件,会根据大小自动生成新的文件(例如一个log是16M,满了之后就会写新的log文件)。rabbitmq在启动的时候会创建两个进程,一个负责持久化消息的存储,另一个负责非持久化消息的存储(内存不够时)
消息存储时会在ets表中记录消息在文件中的映射以及相关信息(包括id、偏移量、有效数据、左边文件、右边文件),消息读取时根据该信息到文件中读取,同时更新信息。
消息删除时只从ets删除,变为垃圾数据,当垃圾数据超出比例(默认为50%),并且文件数达到3个,触发垃圾回收,锁定左右两个文件,整理左边文件有效数据,将右边文件有效数据写入左边,更新文件信息,删除右边,完成合并。当一个文件的有用数据等于0时,删除该文件。
写入文件前先写buffer缓冲区,如果buffer已经满了,则写入文件(此时知识 *** 作系统的页存)。每隔25ms刷一次磁盘,不管buffer满没满都将buffer和页存的数据落盘。每次消息写入后,如果没有后续写入请求,则直接刷盘。
通过对channel的设置实现
1、channel.txSelect():通知服务器开启事务模式,服务端会返回Tx.Select.Ok
2、channel.basicPublish:发送消息,可以是多条可以是消费消息提交ack
3、channel.txCommit():提交事务
4、channel.txRollback():回滚事务
消费者使用事务:
1、autoAck = false,手动提交ack,以事务提交或回滚为准
2、autoAck = true,不支持事务,即使再收到消息后再回滚事务也是于事无补的,队列已经把消息移除了
如果其中任意一个环节出现问题,就会抛出IoException异常,用户可以拦截异常进行事务回滚,或决定要不要重复消息
事务消息会降低RabbitMQ的性能,因为每一条消息都意味着好几次连接
1、使用事务消息
2、使用消息的确认机制(即ack)
发送方确认发送出去:
- 将channel设置为/confirm/i模式,则从该channel上发出的每条消息都会被分配一个唯一id
- 消息投递成功后,channel会发送ack给生产者,包含了id,回调/confirm/iCallback接口(该接口是异步的)
- 如果发生错误导致消息丢失,发送nack给生产者,回调ReturnCallback接口
- ack和nack只有一个触发,且只有一次,异步触发,可以继续发送消息
发送到MQ之后,做了持久化之后数据才会可靠。
接收方确认消费完了:
- 声明队列时,指定noack = false,broker会等待消费者手动返回ack,才会从磁盘或者内存中删除消息,否则立刻删除
- broker的ack没有超时机制,只会判断链接是否断开,如果断开,消息会被重新发送
- 如果ack没有提交,那么broker中的该消息就不会被删除,所以消费者接受每一条消息后都必须进行确认
- 如果消费者返回ack之前断开了连接MQ的broker会重新分发给下一个订阅的消费者(可能存在消息重复消费的隐患)
死信队列里面放的是死信消息,下面是死信消息产生的原因:
1、消息被消费方否定确认,使用channel.basicNack或channel.basicReject,并且此时requeue属性被设置为false,表示直接丢弃(requeue为true的话会重复投递)
2、消息在队列的存活时间超过设置的TTL时间
3、消息队列的消息数量已经超过最大队列长度
如果满足上面条件,那么该消息将成为死信消息,如果配置了死信队列信息,那么该消息将会被丢入死信队列中,如果没有配置,则该消息将会被丢弃
为每个需要使用死信队列的业务队列配置一个死信交换机,同一个项目的死信交换机可以共用一个,然后为每个业务队列分配一个单独的routeKey,死信队列只不过是绑定在死信交换机上的队列。
TTL:一条消息或者该该队列中所有消息的最大存活时间
如果一条消息设置了TTL属性或者进入设置TTL属性的队列,那么这条消息在TTL设置的时间内没有被消费,则会成为死信,如果同时配置了队列的TTL和消息的TTL,那么较小的那个值将会被使用
从之前的架构设计来看,生产者先把消息发到交换器,然后交换器根据匹配规则将消息发送给队列,实际上生产者也是可以直接把消息发给队列的,但是正常不这样做,会丧失灵活性(一对一与一对多都可),直连的话只能是一对一了。
下面是实现方式以及参数说明
声明Queue的参数说明
该架构参考了kafka,NameServer类似于kafka中的zookeeper,queue类似于kafka中的partition。
kafka中,zk本身存在主从,主从之间也会有数据同步。NameServer则是一个去中心化的结构,每个NameServer之间互相独立,不进行互相通信。只要NameServer存在一个可用节点,那么NameServer就是可用的,它的作用主要就是为了维护路由信息,发送者是谁->发给哪个topic的哪个queue、broker是哪一个->消费者是谁。
注意这里的queue是不存在主从的,而kafka的partition是存在主从的。所以RocketMQ里面的queue是冗余的,有n个broker,就会冗余n-1个数据。这样的好处体现在负载均衡上,如果broker1宕机了,生产者queue1连不上,之前可能会去连queue2,但是此时它会直接去连接broker2的queue1,提高成功率,
每一个Broker要和每一个NameServer建立长连接,底层是由netty维护通信,broker会定期地将自己地topic信息注册到NameServer里。
生产者首先需要连接NameServer,去拉取topic所属地broker,然后直连broker,发送消息到topic的dqueue里面去。
消费者也是需要连接NameServer,去拉取topic所属地broker,然后直连broker,从topic的queue里面获取消息进行消费。
与broker的持久化相关的涉及到三个日志文件:
CommitLog:存储的具体的消息内容,但是不区分topic,是顺序读写
ConsumeQueue:是commitlog基于topic的索引文件,所以是先根据topic到这个文件里面找索引,然后拿着索引去CommitLog里面找具体内容,顺序存储
IndexFile:通过key或时间区间来建立索引,也是commitlog的索引文件
- commitlog:日志数据文件,被所有的queue共享,1G,写满之后重新生成,顺序写
- consumeQueue:逻辑queue,消息先到到commitlog,然后异步转发到consumeQueue,包含queue在commitlog种的物理位置偏移量offset,消息实体内容的大小和Message Tag的hash值。大小约为600W个字节,写满之后重新生成,顺序写
- indexFile:通过key或者时间区间来查找commitlog种的消息,文件名以创建的时间戳命名,固定的单个indexFile大小为400M,可以保存2000W个索引
所有队列共用一个日志数据文件,避免了kafka分区数过多、日志文件过多导致磁盘IO读写压力较大造成性能瓶颈。rocketmq的queue只存储少量数据、更加轻量化,对于磁盘的访问时串行化避免磁盘竞争,缺点在于:写入是顺序写,读是随机读,先读consumeQueue,再读commitlog会降低消息读的效率。
消息发送到broker之后,会被写入commitlog,写之前加锁,保证顺序写入,然后转发到consumeQueue。
消息消费时先从consumeQueue读取消息在Commitlog中的起始物理偏移量offset,消息大小和消息Tag的HashCode值,在从commitlog读取消息内容
- 同步刷盘,消息持久化到磁盘才会给生产者返回ack,可以保证消息可靠、但是回影响性能
- 异步刷盘,消息写入pagecache就返回ack给生产者,刷盘采用异步线程,降低读写延迟提高性能和吞吐
默认是不能保证的,需要程序保证发送和消费的是同一个queue,多线程消费也无法保证
发送顺序:发送端自己的业务逻辑保证先后,发往一个固定的queue,生产者可以在消息体上设置消息顺序
发送者实现MessageQueueSelector接口,选择一个queue进行发送,也可以使用rocketmq提供的默认实现:
- SelectMessageQueueByHash:按参数的hashcode与可选队列进行求余选择
- SelectMessageQueueByRandom:随机选择
mq:queue本身就是顺序追加写,只需要保证一个队列同一时间只有一个consumer消费,通过加锁实现,consumer上的顺序消费有一个定时任务、每隔一定时间向broker发送请求延长锁定
消费端:
pull模式:消费者需要自己维护需要拉取的queue,一次拉取的消息都是顺序的,需要消费端自己保证顺序消费
push模式:消费实例实现自己的MQPushConsumer接口,提供注册监听的方法消费消息,registerMessageListener、重载方法。
- MessageListenerConcurrently:并行消费
- MessageListenerOrderly:串行消费,consumer会把消息放入本地队列并加锁,定时任务保证锁的同步
RocketMQ由NameServer集群、Producer集群、Consumer集群、Broker集群组成,消息生产和消费的大致原理如下:
1、Broker在启动的时候向所有的NameServer注册,并保持长连接,每30s发送一次心跳
2、Producer在发送消息的时候从NameServer获取Broker服务器地址,根据负载均衡算法选择一台服务器来发送消息
3、Consumer消费消息的时候同样从NameServer获取Broker地址,然后主动拉取消息来消费
RocketMQ如何保证不丢失消息生产者:
- 同步阻塞的方式发送消息,加上失败重试机制,可能broker存储失败,可以通过查询确认
- 异步发送需要重写回调方法,检查发送结果
- ack机制,可能存储commitlog,存储consumerQueue失败,此时对消费者不可见
broker:同步刷盘、集群模式下采用同步复制、会等待slave复制完成才会返回确认
消费者:
- offset手动提交,消息消费保证幂等
好的方式:
1、从整体到细节,从业务场景到技术实现
2、以现有产品为基础
实现:
1、先实现一个单机的先进先出的数据结构,对message设计封装。要高效、可扩展以及收缩
2、将单机队列扩展成为分布式队列,涉及到分布式集群管理,如zookeeper、NameServer
3、基于Topic定制路由策略(从生产者到消费者的完整链路): 发送者路由策略、消费者与队列对应关系、消费者路由策略
4、实现高效的网络通信。-> Netty、Http
5、规划日志文件,实现文件高效读写(零拷贝+顺序写)服务重启后,快速还原运行现场
6、定制高级功能,死信队列、延迟队列、事务消息等等。(需要贴合业务实际)
参考:
如何设计一个MQ
如何进行产品选型kafka:
优点:吞吐量非常大,性能非常好,集群高可用
缺点:会丢失数据,功能单一。不具备死信队列等高级功能
使用场景:数据量大,频繁,且允许丢失数据:日志分析、大数据采集
RabbitMQ:
优点:消息可靠性高,功能全面
缺点:吞吐量比较低,并发性不高,消息积累会严重影响性能。适合在消息来了立马消费的场景使用。erlang开发,语言不好定制
使用场景:小规模场景
RocketMQ:
优点:高吞吐,高性能,高可用,功能全面的
缺点:开源版本功能不如云上商业版本。官方文档和周边生态不成熟。客户端只支持java
使用场景:几乎是全场景
参考链接:https://rocketmq.apache.org/docs/order-example/
这个知识点是有一个背景的,在rocketmq里面,有一个完善的机制,在产品层面上对消息进行顺序的保证,而kafka与rabbitmq是没有这样的设计的。
消息顺序分为全局有序和局部有序,MQ只需要保证局部有序,不需要保证全局有序。保证一个窗口内的消息是有序的,多个窗口之间的消息有序没有业务意义。例如一个订单,有许多处理步骤,这些步骤是不能乱的 ,消息必须是从上往下进行消费。订单与订单之间消息可以不是有序的,没有必要等到1号订单发完再发2号订单。
参考
1天刷完面试核心45问消息队列面试题(Kafka&RabbitMQ&RocketMQ)
44讲
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)