Kafka核心组件之控制器和协调器

Kafka核心组件之控制器和协调器,第1张

[TOC]

我们已经知道Kafka的集群由n个的broker所组成,每个broker就是一个kafka的实例或者称之为kafka的服务。其实控制器也是一个broker,控制器也叫leader broker。

他除了具有一般broker的功能外,还负责分区leader的选取,也就是负责选举partition的leader replica。

kafka每个broker启动的时候,都会实例化一个KafkaController,并将broker的id注册到zookeeper,集群在启动过程中,通过选举机制选举出其中一个broker作为leader,也就是前面所说的控制器。

包括集群启动在内,有三种情况触发控制器选举:

1、集群启动

2、控制器所在代理发生故障

3、zookeeper心跳感知,控制器与自己的session过期

按照惯例,先看图。我们根据下图来讲解集群启动时,控制器选举过程。

假设此集群有三个broker,同时启动。

(一)3个broker从zookeeper获取/controller临时节点信息。/controller存储的是选举出来的leader信息。此举是为了确认是否已经存在leader。

(二)如果还没有选举出leader,那么此节点是不存在的,返回-1。如果返回的不是-1,而是leader的json数据,那么说明已经有leader存在,选举结束。

(三)三个broker发现返回-1,了解到目前没有leader,于是均会触发向临时节点/controller写入自己的信息。最先写入的就会成为leader。

(四)假设broker 0的速度最快,他先写入了/controller节点,那么他就成为了leader。而broker1、broker2很不幸,因为晚了一步,他们在写/controller的过程中会抛出ZkNodeExistsException,也就是zk告诉他们,此节点已经存在了。

经过以上四步,broker 0成功写入/controller节点,其它broker写入失败了,所以broker 0成功当选leader。

此外zk中还有controller_epoch节点,存储了leader的变更次数,初始值为0,以后leader每变一次,该值+1。所有向控制器发起的请求,都会携带此值。如果控制器和自己内存中比较,请求值小,说明kafka集群已经发生了新的选举,此请求过期,此请求无效。如果请求值大于控制器内存的值,说明已经有新的控制器当选了,自己已经退位,请求无效。kafka通过controller_epoch保证集群控制器的唯一性及 *** 作的一致性。

由此可见,Kafka控制器选举就是看谁先争抢到/controller节点写入自身信息。

控制器的初始化,其实是初始化控制器所用到的组件及监听器,准备元数据。

前面提到过每个broker都会实例化并启动一个KafkaController。KafkaController和他的组件关系,以及各个组件的介绍如下图:

图中箭头为组件层级关系,组件下面还会再初始化其他组件。可见控制器内部还是有些复杂的,主要有以下组件:

1、ControllerContext,此对象存储了控制器工作需要的所有上下文信息,包括存活的代理、所有主题及分区分配方案、每个分区的AR、leader、ISR等信息。

2、一系列的listener,通过对zookeeper的监听,触发相应的 *** 作,的框的均为listener

3、分区和副本状态机,管理分区和副本。

4、当前代理选举器ZookeeperLeaderElector,此选举器有上位和退位的相关回调方法。

5、分区leader选举器,PartitionLeaderSelector

6、主题删除管理器,TopicDeletetionManager

7、leader向broker批量通信的ControllerBrokerRequestBatch。缓存状态机处理后产生的request,然后统一发送出去。

8、控制器平衡 *** 作的KafkaScheduler,仅在broker作为leader时有效。

Kafka集群的一些重要信息都记录在ZK中,比如集群的所有代理节点、主题的所有分区、分区的副本信息(副本集、主副本、同步的副本集)。每个broker都有一个控制器,为了管理整个集群Kafka选利用zk选举模式,为整个集群选举一个“中央控制器”或”主控制器“,控制器其实就是一个broker节点,除了一般broker功能外,还具有分区首领选举功能。中央控制器管理所有节点的信息,并通过向ZK注册各种监听事件来管理整个集群节点、分区的leader的选举、再平衡等问题。外部事件会更新ZK的数据,ZK中的数据一旦发生变化,控制器都要做不同的响应处理。

故障转移其实就是leader所在broker发生故障,leader转移为其他的broker。转移的过程就是重新选举leader的过程。

重新选举leader后,需要为该broker注册相应权限,调用的是ZookeeperLeaderElector的onControllerFailover()方法。在这个方法中初始化和启动了一系列的组件来完成leader的各种 *** 作。具体如下,其实和控制器初始化有很大的相似度。

1、注册分区管理的相关监听器

2、注册主题管理的相关监听

3、注册代理变化监听器

4、重新初始化ControllerContext,

5、启动控制器和其他代理之间通信的ControllerChannelManager

6、创建用于删除主题的TopicDeletionManager对象,并启动。

7、启动分区状态机和副本状态机

8、轮询每个主题,添加监听分区变化的PartitionModificationsListener

9、如果设置了分区平衡定时 *** 作,那么创建分区平衡的定时任务,默认300秒检查并执行。

除了这些组件的启动外,onControllerFailover方法中还做了如下 *** 作:

1、/controller_epoch值+1,并且更新到ControllerContext

2、检查是否出发分区重分配,并做相关 *** 作

3、检查需要将优先副本选为leader,并做相关 *** 作

4、向kafka集群所有代理发送更新元数据的请求。

下面来看leader权限被取消时,调用的方法onControllerResignation

1、该方法中注销了控制器的权限。取消在zookeeper中对于分区、副本感知的相应监听器的监听。

2、关闭启动的各个组件

3、最后把ControllerContext中记录控制器版本的数值清零,并设置当前broker为RunnignAsBroker,变为普通的broker。

通过对控制器启动过程的学习,我们应该已经对kafka工作的原理有了了解, 核心是监听zookeeper的相关节点,节点变化时触发相应的 *** 作

有新的broker加入集群时,称为代理上线。反之,当broker关闭,推出集群时,称为代理下线。

代理上线:

1、新代理启动时向/brokers/ids写数据

2、BrokerChangeListener监听到变化。对新上线节点调用controllerChannelManageraddBroker(),完成新上线代理网络层初始化

3、调用KafkaControlleronBrokerStartup()处理

35恢复因新代理上线暂停的删除主题 *** 作线程

代理下线:

1、查找下线节点集合

2、轮询下线节点,调用controllerChannelManagerremoveBroker(),关闭每个下线节点网络连接。清空下线节点消息队列,关闭下线节点request请求

3、轮询下线节点,调用KafkaControlleronBrokerFailure处理

4、向集群全部存活代理发送updateMetadataRequest请求

顾名思义,协调器负责协调工作。本节所讲的协调器,是用来协调消费者工作分配的。简单点说,就是消费者启动后,到可以正常消费前,这个阶段的初始化工作。消费者能够正常运转起来,全有赖于协调器。

主要的协调器有如下两个:

1、消费者协调器(ConsumerCoordinator)

2、组协调器(GroupCoordinator)

kafka引入协调器有其历史过程,原来consumer信息依赖于zookeeper存储,当代理或消费者发生变化时,引发消费者平衡,此时消费者之间是互不透明的,每个消费者和zookeeper单独通信,容易造成羊群效应和脑裂问题。

为了解决这些问题,kafka引入了协调器。服务端引入组协调器(GroupCoordinator),消费者端引入消费者协调器(ConsumerCoordinator)。每个broker启动的时候,都会创建GroupCoordinator实例,管理部分消费组(集群负载均衡)和组下每个消费者消费的偏移量(offset)。每个consumer实例化时,同时实例化一个ConsumerCoordinator对象,负责同一个消费组下各个消费者和服务端组协调器之前的通信。如下图:

消费者协调器,可以看作是消费者做 *** 作的代理类(其实并不是),消费者很多 *** 作通过消费者协调器进行处理。

消费者协调器主要负责如下工作:

1、更新消费者缓存的MetaData

2、向组协调器申请加入组

3、消费者加入组后的相应处理

4、请求离开消费组

5、向组协调器提交偏移量

6、通过心跳,保持组协调器的连接感知。

7、被组协调器选为leader的消费者的协调器,负责消费者分区分配。分配结果发送给组协调器。

8、非leader的消费者,通过消费者协调器和组协调器同步分配结果。

消费者协调器主要依赖的组件和说明见下图:

可以看到这些组件和消费者协调器担负的工作是可以对照上的。

组协调器负责处理消费者协调器发过来的各种请求。它主要提供如下功能:

组协调器在broker启动的时候实例化,每个组协调器负责一部分消费组的管理。它主要依赖的组件见下图:

这些组件也是和组协调器的功能能够对应上的。具体内容不在详述。

下图展示了消费者启动选取leader、入组的过程。

消费者入组的过程,很好的展示了消费者协调器和组协调器之间是如何配合工作的。leader consumer会承担分区分配的工作,这样kafka集群的压力会小很多。同组的consumer通过组协调器保持同步。消费者和分区的对应关系持久化在kafka内部主题。

消费者消费时,会在本地维护消费到的位置(offset),就是偏移量,这样下次消费才知道从哪里开始消费。如果整个环境没有变化,这样做就足够了。但一旦消费者平衡 *** 作或者分区变化后,消费者不再对应原来的分区,而每个消费者的offset也没有同步到服务器,这样就无法接着前任的工作继续进行了。

因此只有把消费偏移量定期发送到服务器,由GroupCoordinator集中式管理,分区重分配后,各个消费者从GroupCoordinator读取自己对应分区的offset,在新的分区上继续前任的工作。

下图展示了不提交offset到服务端的问题:

开始时,consumer 0消费partition 0 和1,后来由于新的consumer 2入组,分区重新进行了分配。consumer 0不再消费partition2,而由consumer 2来消费partition 2,但由于consumer之间是不能通讯的,所有consumer2并不知道从哪里开始自己的消费。

因此consumer需要定期提交自己消费的offset到服务端,这样在重分区 *** 作后,每个consumer都能在服务端查到分配给自己的partition所消费到的offset,继续消费。

由于kafka有高可用和横向扩展的特性,当有新的分区出现或者新的消费入组后,需要重新分配消费者对应的分区,所以如果偏移量提交的有问题,会重复消费或者丢消息。偏移量提交的时机和方式要格外注意!!

1、自动提交偏移量

设置 enableautocommit为true,设定好周期,默认5s。消费者每次调用轮询消息的poll() 方法时,会检查是否超过了5s没有提交偏移量,如果是,提交上一次轮询返回的偏移量。

这样做很方便,但是会带来重复消费的问题。假如最近一次偏移量提交3s后,触发了再均衡,服务器端存储的还是上次提交的偏移量,那么再均衡结束后,新的消费者会从最后一次提交的偏移量开始拉取消息,此3s内消费的消息会被重复消费。

2、手动提交偏移量

设置 enableautocommit为false。程序中手动调用commitSync()提交偏移量,此时提交的是poll方法返回的最新的偏移量。

commitSync()是同步提交偏移量,主程序会一直阻塞,偏移量提交成功后才往下运行。这样会限制程序的吞吐量。如果降低提交频次,又很容易发生重复消费。

这里我们可以使用commitAsync()异步提交偏移量。只管提交,而不会等待broker返回提交结果

commitSync只要没有发生不可恢复错误,会进行重试,直到成功。而commitAsync不会进行重试,失败就是失败了。commitAsync不重试,是因为重试提交时,可能已经有其它更大偏移量已经提交成功了,如果此时重试提交成功,那么更小的偏移量会覆盖大的偏移量。那么如果此时发生再均衡,新的消费者将会重复消费消息。

Kafka是高吞吐量低延迟的高并发、高性能的消息中间件,在大数据领域有广泛的应用。那他是如何做到这么高的吞吐量和高性能呢?

生产者通过多batch合并一个request 一次性发送broker提高吞吐量
每个Kafka服务端叫做一个broker,负责管理一台机器上的数据;每个topic拆分成多个partition,这样每个partition存储一部分数据并放在不同的broker上。这时候生产者如果生产一条消息,就建立连接然后发送数据,效率肯定不高。

Kafka会在生产者放一个内存缓冲区,当生产消息后,它会把同一个topic同一个partition的消息打包成一个batch。

这样就结束了吗?当然不是,如果生产者这边有多个topic,不同topic发送到同一个broker的数据是否可以合并呢?当然可以
如果客户端那边有2个topic,每个topic有个3个partition,那每个broker就会有2个partition。这时候我们完全可以将发送到同一个broker的batch一次发送。所以kafka将发送到同一个broker的batch打包成一个request发送,进一步提高了通信的效率。

首先kafka在接收到数据后都会写磁盘,但是我们都知道写磁盘性能会很差,所以它并不是直接写磁盘。 *** 作系统本身有一层缓存叫做页缓存,相当于内存,所以kafka是把数据写入到页缓存中,接下来由页缓存决定什么时候写入到磁盘。

普通机械磁盘如果随机写的话,性能会非常差,Kafka采用磁盘顺序写的方式,仅仅将数据追加到文件末尾,这种方式性能基本可以和内存媲美。

上面说的是kafka写入数据的优秀设计,那从磁盘读数据分发又是如何设计的呢?

我们可以想想分发数据一般是如何做的。首先肯定是从磁盘中读出数据到页缓存,然后从页缓存中拷贝到kafka中,然后再从kafka中拷贝到socket中,最后再给网卡。
而零拷贝技术,就是去除上面2步拷贝,直接从页缓存中将数据发送到网卡中,socket中仅仅只是拷贝一个描述符,这个过程大大提升了读取的性能。而且从磁盘读数据时候会先看看页缓存中有没有,如果有的话都不用读磁盘了。

相信看见题目的同学都会很有疑问,甚至不服气,这都是基于个人对于kafka原理的理解,我可以说磁盘顺序写要比内存的随机读快吧。但是说到底,基于性能的优化方面,还是离不开内存的。

Kafka作为一个支持大数据量写入写出的消息队列,由于是基于Scala和Java实现的,而Scala和Java均需要在JVM上运行,所以如果是基于内存的方式,即JVM的堆来进行数据存储则需要开辟很大的堆来支持数据读写,从而会导致GC频繁影响性能。考虑到这些因素,kafka是使用磁盘存储数据的。

Kafka 中消息是以 topic 进行分类的,生产者生产消息,消费者消费消息,都是面向topic的。topic存储结构见下图:

由于生产者生产的消息会不断追加到 log 文件末尾,为防止 log 文件过大导致数据定位效率低下,Kafka 采取了 分片 索引 机制,将每个partition分为多个segment。每个 segment对应两个文件——“index”文件和“log”文件。

partition文件夹命名规则

topic 名称+分区序号,举例有一个topic名称文“kafka”,这个topic有三个分区,则每个文件夹命名如下:

index和log文件的命名规则

1)partition文件夹中的第一个segment从0开始,以后每个segement文件以上一个segment文件的最后一条消息的offset+1命名(当前日志中的第一条消息的offset值命名)。

2)数值最大为64位long大小。19位数字字符长度,没有数字用0填充。

举例,有以下三对文件:

以第二个文件为例看下对应的数据结构:

稀疏索引 需要注意下。

消息查找过程

找message-2589,即offset为2589:

1)先定位segment文件,在0000000000000002584中。

2)计算查找的offset在日志文件的相对偏移量 offset - 文件名的数量 = 2589 - 2584 = 5; 在index文件查找第一个参数的值,若找到,则获取到偏移量,通过偏移量到log文件去找对应偏移量的数据即可; 本例中没有找到,则找到当前索引中偏移量的上线最接近的值,即3,偏移量文246;然后到log文件中从偏移量为246数据开始向下寻找。

简单了解了kafka在数据存储方面的知识,线面我们具体分析下为什么kafka基于磁盘却快于内存。

在前面了解存储结构过程中,我们发现kafka记录log日志使用的结尾追加的方式,即 顺序写 。这样要比随机写块很多,这与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。

mmap,简单描述其就是将磁盘文件映射到内存, 用户通过修改内存就能修改磁盘文件。

即便是顺序写磁盘,磁盘的读写速度任然比内存慢慢的多得多,好在 *** 作系统已经帮我们解决这个问题。在Linux *** 作系统中,Linux会将磁盘中的一些数据读取到内存当中,我们称之为内存页。当需要读写硬盘的时候,都优先在内存页中进行处理。当内存页的数据比硬盘数据多的时候,就形成了 脏页 ,当脏页达到一定数量, *** 作系统会进行 刷脏 ,即将内存也数据写到磁盘。

问题:不可靠,写到 mmap 中的数据并没有被真正的写到硬盘, *** 作系统会在程序主动调用 Flush 的时候才把数据真正的写到硬盘。

零拷贝并不是不需要拷贝,而是减少不必要的拷贝次数,通常使用在IO读写过程中。

传统io过程

如上图所示,上图共经历了四次拷贝的过程:

1)数据到内核态的read buffer;
2)内核态的read buffer到用户态应用层的buffer;
3)用户态到内核态的socket buffer;
4)socket buffer到网卡的buffer(NIC)。

DMA 引入DMA技术,是指外部设备不通过CPU而直接与系统内存交换数据的接口技术,网卡等硬件设备支持DMA技术。

如上图所示,上图共经历了两次拷贝的过程。

sendfile 在内核版本 21 中,引入了 Sendfile 系统调用,以简化网络上和两个本地文件之间的数据传输。同时使用了DMA技术。

如上图所示,上图共经历了一次拷贝的过程。

sendfile( DMA 收集拷贝) 之前我们是把页缓存的数据拷贝到socket缓存中,实际上,我们仅仅需要把缓冲区描述符传到 socket 缓冲区,再把数据长度传过去,这样 DMA 控制器直接将页缓存中的数据打包发送到网络中就可以了。

如上图所示,最后一次的拷贝也被消除了,数据->read buffer->NIC。

kafka通过java和scala实现,而Java对sendfile是通过NIO 的 FileChannel (
javaniochannelsFileChannel )的 transferTo 和 transferFrom 方法实现零拷贝。

注: transferTo 和 transferFrom 并不保证一定能使用零拷贝。实际上是否能使用零拷贝与 *** 作系统相关,如果 *** 作系统提供 sendfile 这样的零拷贝系统调用,则这两个方法会通过这样的系统调用充分利用零拷贝的优势,否则并不能通过这两个方法本身实现零拷贝。

Kafka是高可用的分布式消息系统,首先要解决的就是资源协调分配和多副本状态维护的问题。解决这些问题通常就是两种思路,一是依靠Zookeeper来协调,二是设定一个中心节点,让这个中心节点来协调。如果依靠Zookeeper来协调,会存在大量的竞争条件,对Zookeeper的访问压力增大,而且如果Zookeeper出现了问题(比如网络抖动),系统很容易出现紊乱。Kafka采用的是第二种思路,即选举一个中心节点来进行资源协调与多副本状态维护,这个中心节点被称作Controller(一个特殊的Broker),这个选举过程依靠Zookeeper来完成。
Broker启动时,会竞争创建临时"/controller"。如果创建成功,则成为Controller,并把Broker的id等信息写入这个节点。同时会全程监控"/controller"的数据变化,如果旧的Controller挂掉,则开启新一轮的竞争过程。

Kafka要进行资源协调,第一件需要知道的事情就是各个Broker的存活状态,这个问题利用Zookeeper可以很容易做到。
假设某个Broker,id为0,它启动时,会创建"/brokers/ids/0"临时节点,并把端口等信息写进去。Controller会监控"/brokers/ids"的节点变化,以实时感知各broker的状态,进行资源协调。

在Kafka这个多副本分区的消息系统里,创建一个topic,至少需要以下3个步骤:

Controller的存在,可以很容易完成上面的b和c步骤,但a步骤不行,如果Controller挂掉,则这些信息会不可用。Kafka把这些信息保存在Zookeeper中,依靠其高可用特性来保证这些信息的高可用。假设某个topic名字为mytopic,创建时,其分区信息保存在"/brokers/topics/mytopic"中。Controller全程监控"/brokers/topics"的孩子节点变动,实时感知这些信息,以完成后续步骤。
创建完成之后,后续往往会有分区调整和topic删除等需求。普通青年可能会觉得这两个问题很简单,给Controller发个相关请求就可以了。事实远非如此!
拿分区调整来说,假设某分区有三个副本,分别位于Broker-1、Broker-2和Broker-3,leader为1,现在扩容增加了Broker-4、Broker-5、Broker-6,为了平衡机器间压力,需要将副本1 2 3移到4 5 6,至少经历以下步骤:

以上每个步骤都有可能失败,如何才能保证这次调整顺利进行呢?
首先,我们不能直接修改该分区的副本信息为 4 5 6,原因很简单,需要等待4 5 6的追赶过程以便产生新leader。其次, *** 作未完全成功的命令需要保存下来,如果 *** 作过程中,Controller挂掉,则新的Controller可以从头开始直至成功。
Kafka怎么做的呢?

当然,这里一次只能有一个调整命令,但一个调整命令可以同时调整多个topic的多个分区。
在这个过程中,Zookeeper的作用是: 持久化 *** 作命令并实时通知 *** 作者 ,是不是只有Zookeeper可以做这个事情呢,不是,但Zookeeper可以做得很好,保证命令高可用。
类似的 *** 作还有topic删除,副本的leader变更等,都是沿用上面的套路。

Broker的集群中有全局配置信息,但如果想针对某个topic或者某个client进行配置呢,Kafka把这些信息保存在Zookeeper中,各个Broker实时监控以更新。

脑裂问题是指,在一个设有中心节点的系统中,出现了两个中心节点。两个中心同时传达命令,自然会造成系统的紊乱。
Kafka利用Zookeeper所做的第一件也是至关重要的一件事情是选举Controller,那么自然就有疑问,有没有可能产生两个Controller呢?
首先,Zookeeper也是有leader的,它有没有可能产生两个leader呢?答案是不会。
quorum机制可以保证,不可能同时存在两个leader获得大多数支持。假设某个leader假死,其余的followers选举出了一个新的leader。这时,旧的leader复活并且仍然认为自己是leader,这个时候它向其他followers发出写请求也是会被拒绝的。因为每当新leader产生时,会生成一个epoch,这个epoch是递增的,followers如果确认了新的leader存在,知道其epoch,就会拒绝epoch小于现任leader epoch的所有请求。那有没有follower不知道新的leader存在呢,有可能,但肯定不是大多数,否则新leader无法产生。Zookeeper的写也遵循quorum机制,因此,得不到大多数支持的写是无效的,旧leader即使各种认为自己是leader,依然没有什么作用。

Kafka的Controller也采用了epoch,具体机制如下:

理论上来说,如果Controller的SessionExpired处理成功,则可以避免双leader,但假设SessionExpire处理意外失效的情况:旧Controller假死,新的Controller创建。旧Controller复活,SessionExpired处理 意外失效 ,仍然认为自己是leader。
这时虽然有两个leader,但没有关系,leader只会发信息给存活的broker(仍然与Zookeeper在Session内的),而这些存活的broker则肯定能感知到新leader的存在,旧leader的请求会被拒绝。

每个Broker有一个metaDataCache,缓存有topic和partition的基本信息,可以正常的生产和消费信息,但不能进行topic的创建、调整和删除等 *** 作。
此外,Broker会不断重试连接。

待续

假设Broker数目为B,topic数目为T,所有topic总partition数目为P,Client数目为C,以下数值均为峰值:

零售业:主要集中在客户营销分析上,通过大数据技术可以对客户的消费信息进行分析。获知
客户的消费习惯、消费方向等,以便商场做好更合理商品、货架摆放,规划市场营销方案、产品推荐手段等。
金融业:在金融行业里头,数据即是生命,其信息系统中积累了大量客户的交易数据。通过大数据可以对客户的行为进行分析、防堵诈骗、金融风险分析等。
医疗业:通过大数据可以辅助分析疫情信息,对应做出相应的防控措施。对人体健康的趋势分析在电子病历、医学研发和临床试验中,可提高诊断准确性和药物有效性等。
制造业:该行业对大数据的需求主要体现在产品研发与设计、供应链管理、生产、售后服务等。通过数据分析,在产品研发过程中免除掉一些不必要的步骤,并且及时改善产品的制造与组装的流程。

kafka是分布式消息队列或者叫分布式消息中间件,有时候会叫做一种MQ产品(Message Queue),同类型的有RabbitMQ,ActiveMQ等等。
MQTT是一种即时消息传输协议,Message Queuing Telemetry Transport,也就是一种即时信息传输的一种格式约定,与其类似的有XMPP等,是用来做IM的。
kafka是不支持MQTT协议的,如果非要把它们集成在一起,你要不自己分析,要不去Github上找找,说不定有人做过这样的项目。
两个M的意思,是完全不一样的,kafka的M是指各个服务器或各个进程间传输的消息,而MQTT的M,是指类似MSN,QQ那种IM中那种大家交流的那种消息。

在基于了解或掌握其他同类MQ的基础知识上,怎么比较快速的掌握kafka的核心设计,确保在使用的过程中做到心中有数,做到知其然并知其所以然?本篇文章主要是笔者在已有的rmq的基础上学习kafka的思路以及过程的总结。

ps、rmq指RocketMQ

ps、文章写着写着发现有点长,应该挺乱了……

ps、因为是学习笔记,所以就这样吧,随便看看……

带着问题去学习新的技能,也许会更贴近自己原有的知识储备,也能更好的把新知识纳入自己原有的知识体系并加以补充或者延展,形成更完整的知识脉络。基于原有的rmq的知识体系,在提前梳理了几个相关的,并且浅显的问题,主要是两个方面的内容,一类是MQ模型中生产者客户端的设计与消费者客户端的设计,一类是服务端的总体架构设计。

服务端的总体架构设计

客户端的总体架构设计

相信很多人不管是在面试中,还是在做MQ选型时,都会遇到几个问题,比如Kafka在超过2k的topic时性能会急剧下降,但是rmq在超过2k的topic时性能不存大规模下降,比如Kafka是一个分布式消息队列中间件,而rmq更像一个单机版消息队列中间件等。这些问题的背后,正是两个消息中间件在架构设计上的差异性所导致的,各有优劣势,我们更多的关注设计思路。先看这几个问题。

在rmq服务端写入时,完全是基于commit log 做log append,避免了磁盘的随机读写,再配合零拷贝等技术特性,成为了MQ的高并发利器。而由于RMQ的全量日志都维护在commit log,这也是其余kafka的一个架构设计上的区别。相信初步了解过kafka的同学,都应该知道其设计理念中关于分区与副本的概念,一个topic在集群中存在多个分区,一个分区在集群中存在多个副本,不同的topic之间分区是互不关联的,当单机维护超过2k的topic时,意味着单机存在2k多个分区,即便topic内日志采用log append,那么在高并发写入刷盘时,磁头在这些分区的副本文件上移来移去,性能自然会随之下降,看起来像是‘随机读写’。

这个说法是在一个中间件爱好群里看到大家在讨论时聊到的,感觉相当有意思,这种看法背后又是怎么的逻辑呢?首先,网上能找到的二者大量的对比都是基于单机的对比,集群对比很少。从分区+副本的思路来看,kafka的部署架构看起来是多个broker组成集群,但是内部运转逻辑是分区维度的多副本间高可用,即topic在多个broker之间做高可用的保证,而副本间的运转逻辑是基于zookeeper的ZAB机制。反观rmq最开始的架构确实主从架构,看起来更简单,但是可用性的保证上完全不一样,由于所有的topic都在主节点上,主节点挂了整个集群就运转不下去了,因为只有主可以支持写,所以rmq推荐使用双主架构,后来才引入raft协议支持选举,但依旧是基于broker的选举。二者最大的区别在于,集群中某个节点挂机对于整个集群的影响程度不同,毫无疑问,rmq显得更重。同样的多节点集群中,每个kafka broker都在提供读写能力,因为不同的topic的副本散落在各个broker中,而每个topic的leader副本也会分散在整个集群中,而rmq则不同,所以理论上kafka集群能提供的吞吐量应该会比rmq更高。

从前两个问题,提到了几个很核心的概念,包括分区,副本,而这也是kafka最核心设计内容。kafka的分区这个设计很有意思( 关于kafka分区 ),kafka的集群是一个整体,对于topic而言,分区个数相当于多少个可读写节点,一个分区下存在多个副本组成一个分布式可选主的‘集群’。

如上图,在一个kafka集群中,部署了三个服务端节点,在topic-a创建时,创建了2个分区,3个副本,在这个部署下,提供读写能力的只有broker1节点上1分区的副本,broker2节点上2分区的副本。对于部署节点broker而言并无主次之分,分区与分区间相互独立,分区内副本间组成集群为topic-a : partation-1提供服务。topic 与 分区 可以看做是逻辑概念,副本为物理概念。所以,前文提到弱化broker的概念就在于,它是基于分区提供服务,这个与rmq的设定完全不同,也许是先入为主的关系,又或者在rmq架构中broker的设定更像是mysql的主从设定,rmq的broker理解起来更简单。

那么什么是isr, asr 在说这个之前,先说说对于一条消息而言,kafka理论上应该如何在兼顾一定性能的情况下获取更高的可靠性?请求写入分区1的leader副本,就能保证数据一定不丢失吗?如果此时leader节点宕机发生选举,由于follower节点还没同步leader数据,那是不是一段时间内的数据就丢失了呢?那为了更高的可靠性,是不是可以选择等所有副本都同步到当前消息才算本次写入成功?follower节点的数据时从leader节点复制而来(此处会抽象一个很常见的水位高低的概念,但是还没详细了解,暂时忽略),那如果follower节点的数据跟leader节点的数据很接近的话,那么复制会很快完成,但是如果某个follower节点的数据落后leader的节点很多,等待完全同步需要更长的时间,毫无疑问将会引发灾难性的结果。那么,有没有一种相对均衡,可接受的方案,比如只等待落后leader节点数据量较低的follower节点成功复制就算成功?技术方案的选择往往都是取舍,特别是多副本间的数据一致性的问题。

isr集合,俗称副本同步集。kafka并非是根据副本间数据复制的偏移量来计算集合,而是根据数据同步的时间间隔(参数为 [replicalagtimemaxms](>

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/dianzi/13513286.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-08-22
下一篇 2023-08-22

发表评论

登录后才能评论

评论列表(0条)

保存