Zookeeper 在 Kafka 中的作用

Zookeeper 在 Kafka 中的作用,第1张

如上图所示,kafaka集群的 broker,和 Consumer 都需要连接 Zookeeper。
Producer 直接连接 Broker。

Producer 把数据上传到 Broker,Producer可以指定数据有几个分区、几个备份。上面的图中,数据有两个分区 0、1,每个分区都有自己的副本:0'、 1'。

的分区为 leader,白色的为 follower。

leader 处理 partition 的所有读写请求,与此同时,follower会被动定期地去复制leader上的数据。 如下图所示,红色的为 leader,绿色的为 follower,leader复制自己到其他 Broker 中:

Topic 分区被放在不同的 Broker 中,保证 Producer 和 Consumer 错开访问 Broker,避免访问单个 Broker造成过度的IO压力,使得负载均衡。

Broker是分布式部署并且相互之间相互独立,但是需要有一个注册系统能够将整个集群中的Broker管理起来 ,此时就使用到了Zookeeper。在Zookeeper上会有一个专门 用来进行Broker服务器列表记录节点

/brokers/ids

每个Broker在启动时,都会到Zookeeper上进行注册,即到/brokers/ids下创建属于自己的节点,如/brokers/ids/[0N]。

Kafka使用了全局唯一的数字来指代每个Broker服务器,不同的Broker必须使用不同的Broker ID进行注册,创建完节点后, 每个Broker就会将自己的IP地址和端口信息记录 到该节点中去。其中,Broker创建的节点类型是临时节点,一旦Broker宕机,则对应的临时节点也会被自动删除。

在Kafka中,同一个 Topic的消息会被分成多个分区 并将其分布在多个Broker上, 这些分区信息及与Broker的对应关系 也都是由Zookeeper在维护,由专门的节点来记录,如:

/borkers/topics

Kafka中每个Topic都会以/brokers/topics/[topic]的形式被记录,如/brokers/topics/login和/brokers/topics/search等。Broker服务器启动后,会到对应Topic节点(/brokers/topics)上注册自己的Broker ID并写入针对该Topic的分区总数,如/brokers/topics/login/3->2,这个节点表示Broker ID为3的一个Broker服务器,对于"login"这个Topic的消息,提供了2个分区进行消息存储,同样,这个分区节点也是临时节点。

由于同一个Topic消息会被分区并将其分布在多个Broker上,因此, 生产者需要将消息合理地发送到这些分布式的Broker上 ,那么如何实现生产者的负载均衡,Kafka支持传统的四层负载均衡,也支持Zookeeper方式实现负载均衡。

(1) 四层负载均衡,根据生产者的IP地址和端口来为其确定一个相关联的Broker。通常,一个生产者只会对应单个Broker,然后该生产者产生的消息都发往该Broker。这种方式逻辑简单,每个生产者不需要同其他系统建立额外的TCP连接,只需要和Broker维护单个TCP连接即可。但是,其无法做到真正的负载均衡,因为实际系统中的每个生产者产生的消息量及每个Broker的消息存储量都是不一样的,如果有些生产者产生的消息远多于其他生产者的话,那么会导致不同的Broker接收到的消息总数差异巨大,同时,生产者也无法实时感知到Broker的新增和删除。

(2) 使用Zookeeper进行负载均衡,由于每个Broker启动时,都会完成Broker注册过程,生产者会通过该节点的变化来动态地感知到Broker服务器列表的变更,这样就可以实现动态的负载均衡机制。

与生产者类似,Kafka中的消费者同样需要进行负载均衡来实现多个消费者合理地从对应的Broker服务器上接收消息,每个消费者分组包含若干消费者, 每条消息都只会发送给分组中的一个消费者 ,不同的消费者分组消费自己特定的Topic下面的消息,互不干扰。

消费组 (Consumer Group):
consumer group 下有多个 Consumer(消费者)。
对于每个消费者组 (Consumer Group),Kafka都会为其分配一个全局唯一的Group ID,Group 内部的所有消费者共享该 ID。订阅的topic下的每个分区只能分配给某个 group 下的一个consumer(当然该分区还可以被分配给其他group)。
同时,Kafka为每个消费者分配一个Consumer ID,通常采用"Hostname:UUID"形式表示。

在Kafka中,规定了 每个消息分区 只能被同组的一个消费者进行消费 ,因此,需要在 Zookeeper 上记录 消息分区 与 Consumer 之间的关系,每个消费者一旦确定了对一个消息分区的消费权力,需要将其Consumer ID 写入到 Zookeeper 对应消息分区的临时节点上,例如:

/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]

其中,[broker_id-partition_id]就是一个 消息分区 的标识,节点内容就是该 消息分区 上 消费者的Consumer ID。

在消费者对指定消息分区进行消息消费的过程中, 需要定时地将分区消息的消费进度Offset记录到Zookeeper上 ,以便在该消费者进行重启或者其他消费者重新接管该消息分区的消息消费后,能够从之前的进度开始继续进行消息消费。Offset在Zookeeper中由一个专门节点进行记录,其节点路径为:

/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]

节点内容就是Offset的值。

消费者服务器在初始化启动时加入消费者分组的步骤如下

注册到消费者分组。每个消费者服务器启动时,都会到Zookeeper的指定节点下创建一个属于自己的消费者节点,例如/consumers/[group_id]/ids/[consumer_id],完成节点创建后,消费者就会将自己订阅的Topic信息写入该临时节点。

对 消费者分组 中的 消费者 的变化注册监听 。每个 消费者 都需要关注所属 消费者分组 中其他消费者服务器的变化情况,即对/consumers/[group_id]/ids节点注册子节点变化的Watcher监听,一旦发现消费者新增或减少,就触发消费者的负载均衡。

对Broker服务器变化注册监听 。消费者需要对/broker/ids/[0-N]中的节点进行监听,如果发现Broker服务器列表发生变化,那么就根据具体情况来决定是否需要进行消费者负载均衡。

进行消费者负载均衡 。为了让同一个Topic下不同分区的消息尽量均衡地被多个 消费者 消费而进行 消费者 与 消息 分区分配的过程,通常,对于一个消费者分组,如果组内的消费者服务器发生变更或Broker服务器发生变更,会发出消费者负载均衡。

以下是kafka在zookeep中的详细存储结构图:

早期版本的 kafka 用 zk 做 meta 信息存储,consumer 的消费状态,group 的管理以及 offse t的值。考虑到zk本身的一些因素以及整个架构较大概率存在单点问题,新版本中确实逐渐弱化了zookeeper的作用。新的consumer使用了kafka内部的group coordination协议,也减少了对zookeeper的依赖

[TOC]

本文是 Pulsar 技术系列中的一篇,主要简单梳理了 Pulsar 消息存储与 BookKeeper 存储文件的清理机制。其中,BookKeeper 可以理解为一个 NoSQL 的存储系统,默认使用 RocksDB 存储索引数据。

Pulsar 的消息存储在 BookKeeper 中,BookKeeper 是一个胖客户的系统,客户端部分称为 BookKeeper,服务器端集群中的每个存储节点称为 bookie。Pulsar 系统的 broker 作为 BookKeeper 存储系统的客户端,通过 BookKeeper 提供的客户端 SDK 将 Pulsar 的消息存储到 bookies 集群中。

Pulsar 中的每个 topic 的每个分区(非分区 topic,可以按照分区 0 理解,分区 topic 的编号是从 0 开始的),会对应一系列的 ledger,而每个 ledger 只会存储对应分区下的消息。对于每个分区同时只会有一个 ledger 处于 open 即可写状态。

Pulsar 在生产消息,存储消息时,会先找到当前分区使用的 ledger,然后生成当前消息对应的 entry ID,entry ID 在同一个 ledger 内是递增的。非批量生产的情况(producer 端可以配置这个参数,默认是批量的),一个 entry 中包含一条消息。批量方式下,一个 entry 可能包含多条消息。而 bookie 中只会按照 entry 维度进行写入、查找、获取。

因此,每个 Pulsar 下的消息的 msgID 需要有四部分组成(老版本由三部分组成),分别为(ledgerID,entryID,partition-index,batch-index),其中,partition-index 在非分区 topic 的时候为 -1,batch-index 在非批量消息的时候为 -1。

每个 ledger,当存在的时长或保存的 entry 个数超过阈值后会进行切换,同一个 partition 下的,新的消息会存储到下一个 ledger 中。Ledger 只是一个逻辑概念,是数据的一种逻辑组装维度,并没有对应的实体。

BookKeeper 集群中的每个 bookie 节点收到消息后,数据会分三部分进行存储处理,分别为:journal 文件、entryLog 文件、索引文件。

其中 journal 文件,entry 数据是按照 wal 方式写入的到 journal 文件中,每个 journal 文件有大小限制,当超过单个文件大小限制的时候会切换到下一个文件继续写,因为 journal 文件是实时刷盘的,所以为了提高性能,避免相互之间的读写 IO 相互影响,建议存储目录与存储 entrylog 的目录区分开,并且给每个 journal 文件的存储目录单独挂载一块硬盘(建议使用 ssd 硬盘)。journal 文件只会保存保存几个,超过配置个数的文件将会被删除。entry 存储到 journal 文件完全是随机的,先到先写入,journal 文件是为了保证消息不丢失而设计的。

如下图所示,每个 bookie 收到增加 entry 的请求后,会根据 ledger id 映射到存储到那个 journal 目录和 entry log 目录,entry 数据会存储在对应的目录下。目前 bookie 不支持在运行过程中变更存储目录(使用过程中,增加或减少目录会导致部分的数据查找不到)。

如下图所示,bookie 收到 entry 写入请求后,写入 journal 文件的同时,也会保存到 write cache 中,write cache 分为两部分,一部分是正在写入的 write cache, 一部分是正在正在刷盘的部分,两部分交替使用。

write cache 中有索引数据结构,可以通过索引查找到对应的 entry,write cache 中的索引是内存级别的,基于 bookie 自己定义的 ConcurrentLongLongPairHashMap 结构实现。

另外,每个 entorylog 的存储目录,会对应一个 SingleDirectoryDbLedgerStorage 类实例对象,而每个 SingleDirectoryDbLedgerStorage 对象里面会有一个基于 RocksDB 实现的索引结构,通过这个索引可以快速的查到每个 entry 存储在哪个 entrylog 文件中。每个 write cache 在增加 entry 的时候会进行排序处理,在同一个 write cache,同一个 ledger 下的数据是相邻有序的,这样在 write cache 中的数据 flush 到 entrylog 文件时,使得写入到 entrylog 文件中的数据是局部有序的,这样的设计能够极大的提高后续的读取效率。

SingleDirectoryDbLedgerStorage 中的索引数据也会随着 entry 的刷盘而刷盘到索引文件中。在 bookie 宕机重启时,可以通过 journal 文件和 entry log 文件还原数据,保证数据不丢失。

Pulsar consumer 在消费数据的时候,做了多层的缓存加速处理,如下图所示:

获取数据的顺序如下:

上面每一步,如果能获取到数据,都会直接返回,跳过后面的步骤。如果是从磁盘文件中获取的数据,会在返回的时候将数据存储到 read cache 中,另外如果是读取磁盘的 *** 作,会多读取一部分磁盘上的时候,因为存储的时候有局部有序的处理,获取相邻数据的概率非常大,这种处理的话会极大的提高后续获取数据的效率。

我们在使用的过程中,应尽量避免或减少出现消费过老数据即触发读取磁盘文件中的消息的场景,以免对整体系统的性能造成影响。

BookKeeper 中的每个 bookie 都会周期的进行数据清理 *** 作,默认 15 分钟检查处理一次,清理的主要流程如下:

通过上面的流程,我们可以了解 bookie 在清理 entrylog 文件时的大体流程。

需要特别说明的是,ledger 是否是可以删除的,完全是客户端的触发的,在 Pulsar 中是 broker 触发的。

broker 端有周期的处理线程(默认 2 分钟),清理已经消费过的消息所在的 ledger 机制,获取 topic 中包含的 cursor 最后确认的消息,将这个 topic 包含的 ledger 列表中,在这个 id 之前的(注意不包含当前的 ledger id)全部删除(包括 zk 中的元数据,同时通知 bookie 删除对应的 ledger)。

在运用的过程中我们多次遇到了 bookie 磁盘空间不足的场景,bookie 中存储了大量的 entry log 文件。比较典型的原因主要有如下两个。

原因一:

生产消息过于分散,例如,举个极端的场景,1w 个 topic,每个 topic 生产一条,1w 个 topic 顺序生产。这样每个 topic 对应的 ledger 短时间内不会因为时长或者存储大小进行切换,active 状态的 ledger id 分散在大量的 entry log 文件中。这些 entry log 文件是不能删除或者及时压缩的。

如果遇到这种场景,可以通过重启,强制 ledger 进行切换进行处理。当然如果这个时候消费进行没有跟上,消费的 last ack 位置所在的 ledger 也是处于 active 状态的,不能进行删除。

原因二:

GC 时间过程,如果现存的 enrylog 文件比较多,且大量符合 minor 或 major gc 阈值,这样,单次的 minor gc 或者 major gc 时间过长,在这段时间内是不能清理过期的 entry log 文件。

这是由于单次清理流程的顺序执行导致的,只有上次一轮执行完,才会执行下一次。目前,这块也在提优化流程,避免子流程执行实现过长,对整体产生影响。

计算图级-------是 ROS 处理数据的一种点对点的网络形式。程序运行时,所有进程及它们所进行的数据处理,将会通过一种点对点的网络形式表现出来,即通过节点、节点管理器、话题、服务等来进行表现。
ROS 中的基本计算图级概念包括:节点、节点管理器、参数服务器、消息、服务、话题和包。这些概念以各种形式来提供数据。

ros 命令的说明及参数可以通过 < 命令 > -h (或 --help )来查看
例如: rosnode -h

用键盘控制小海龟运动这一过程的通信机制是怎样的呢其实,这两个节点是通过一个 ROS 话题( Topic )来相互通信的, turtle_teleop_key 在这个话题上发布键盘输入的的消息,而 turtlesim 则订阅该话题以接收该消息。下面通过 rqt 功能包和 rostopic 命令来查看相关信息:
2, rostopic list :能够列出所有当前订阅和发布的话题。
先看一下 rostopic list 子命令需要哪些参数。运行命令:
rostopic list -h
使用 verbose 选项,以列出相关话题的详细信息。运行命令:
rostopic list -v
显示了有关所发布和订阅的话题的详细信息,其中方括号中表示的是话题的类型。
3, rostopic type :用来查看所发布话题的消息类型。
用法: rostopic type [topic] 运行如下命令:
rostopic type /turtle1/cmd_vel
上面的 geometry_msgs/Twist 即为话题 /turtle1/cmd_vel 的消息类型,这在执行命令 rostopic list -v 时
也有所体现。
下面用 rosmsg 命令来查看消息的详细情况:
rosmsg show geometry_msgs/Twist
4, rostopic pub :把数据发布到当前某个正在广播的话题上。通过此命令可以通过直接在终端发送命令来控制小海龟
用法: rostopic pub [topic] [msg_type] [args]

rostopic pub /turtle1/cmd_vel geometry_msgs/Twist -r 1 -- '[20, 00, 00]' '[00, 00, 18]'


欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zz/13422633.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-08-02
下一篇 2023-08-02

发表评论

登录后才能评论

评论列表(0条)

保存