Kafka 设计详解之队列

Kafka 设计详解之队列,第1张

在 上文 中我们介绍了 Kafka 的网络通信,本文打算详细分析 Kafka 的核心 — 队列 的设计和实现,来对 Kafka 进行更深一步的了解。

队列是一种先进先出(FIFO)的数据结构,它是 Kafka 中最重要的部分,负责收集生产者生产的消息,并将这些消息传递给消费者。要实现一个队列有多种方式,Kafka 作为一个消息队列中间件,在设计队列时主要要考虑两个问题:

乍一看到这个问题,我们会想,内存的读取速度远快于磁盘,如果追求性能,内存也充足的话,当然是将生产者产生的消息数据写到内存(比如用一个数组或者链表来存储队列数据),供消费者消费。真的是这样吗?
下面我们依次分析下写内存和写磁盘文件的优缺点,首先,内存的优点是读写速度非常快,但是,如果我们的目标是设计「大数据量」下的「高吞吐量」的消息队列,会有以下几个问题:

接下来我们来分析一下磁盘,写磁盘文件方式存储队列数据的优点就是能规避上述内存的缺点,但其有很严重的缺点,就是读写速度慢,如果纯依靠磁盘,那消息队列肯定做不到「高吞吐量」这个目标。

分析了内存跟磁盘的优缺点,好像我们还是只能选写内存,但我们忽视了磁盘的两个情况:一是磁盘慢是慢在随机读写,如果是顺序读写,他的速度能达到 600MB/sec(RAID-5 磁盘阵列),并不慢,如果我们尽可能地将数据的读写设计成顺序的,可以大大提升性能。二是 现代的 *** 作系统会(尽可能地)将磁盘里的文件进行缓存

有了 *** 作系统级别的文件缓存,那用磁盘存储队列数据的方式就变得有优势了。首先,磁盘文件的数据会有文件缓存,所以不必担心随机读写的性能;其次,同样是使用内存,磁盘文件使用的是 *** 作系统级别的内存,相比于在 Java 内存堆中存储队列,它没有 GC 问题,也没有 Java 对象的额外内存开销,更可以规避应用重启后的内存 load 数据耗时的问题,而且,文件缓存是 *** 作系统提供的,因为我们只要简单的写磁盘文件,系统复杂性大大降低。

因此,Kafka 直接使用磁盘来存储消息队列的数据。

刚才我们已经决定用磁盘文件来存储队列数据,那么要如何选择数据结构呢?一般情况下,如果需要查找数据并随机访问,我们会用 B+ 树来存储数据,但其时间复杂度是 O(log N),由于我们设计的是消息队列,我们可以完全顺序的写收到的生产者消息,消费者消费时,只要记录下消费者当前消费的位置,往后消费就可以了,这样可以对文件尽可能的进行顺序读写,同时,时间复杂度是O(1)。其实,这跟我们写日志的方式很像,每条日志顺序 append 到日志文件。

之前我们已经确定采用直接顺序写磁盘文件的方式来存储队列数据,下面我们来剖析下具体的实现细节。

在 Kafka 中,用一个文件夹存储一条消息队列,成为一个 Log,每条消息队列由多个文件组成,每个文件称为一个 LogSegment,每当一个 LogSegment 的大小到达阈值,系统就会重新生成一个 LogSegment;当旧的 LogSegment 过期需要清理时(虽然磁盘空间相对于内存会宽裕很多,我们可以保存更长时间的消息数据,比如一周,以供消费者更灵活的使用,但还是需要定期清理太老的数据),系统会根据清理策略删除这些文件。

现在我们知道一个队列(Log)是由多个队列段文件(LogSegment)组成的,那么 Kafka 是如何将这些文件逻辑上连接从而组成一条有序队列的呢?在生成每个队列段文件时,Kafka 用该段的初始位移来对其命名,如在新建一个队列时,会初始化第一个队列段文件,那么其文件名就是0,假设每个段的大小是固定值 L,那么第二个段文件名就是 L,第 N 个就是 (N - 1) L。这样,我们就可以根据文件名对段文件进行排序,排序后的顺序就是整个队列的逻辑顺序。

了解了队列的基本实现,下面我们就来分析下队列的核心 *** 作—读和写。

写 *** 作发生在生产者向队列生产消息时,在上篇文章讲网络通信时我们已经说到,所有的客户端请求会根据协议转到一个 Handler 来具体处理,负责写 *** 作的 Handler 叫 ProducerHandler,整个写请求的流程如下:

之前我们说过,如果是顺序写,由于省掉了磁头寻址的时间,磁盘的性能还是很高的,我们看到 Kakfa 队列是以顺序方式写的,所以性能很高。但是,如果一台 Kafka 服务器有很多个队列,而硬盘的磁头是有限的,所以还是得在不同的队列直接来回切换寻址,性能会有所下降。

队列的读 *** 作发送在消费者消费队列数据时,由于队列是线性的,只需要记录消费者上次消费到了哪里(offset),接下去消费就好了。那么首先会有一个问题,由谁来记消费者到底消费到哪里了?

一般情况下,我们会想到让服务端来记录各个消费者当前的消费位置,当消费者来拉数据,根据记录的消费位置和队列的当前位置,要么返回新的待消费数据,要么返回空。让服务端记录消费位置,当遇到网络异常时会有一些问题,比如服务端将消息发给消费者后,如果网络异常消费者没有收到消息,那么这条消息就被「跳过」了,当然我们可以借鉴二阶段提交的思想,服务端将消息发送给消费者后,标记状态为「已发送」,等消费者消费成功后,返回一个 ack 给服务端,服务端再将其标记为「成功消费」。不过这样设计还是会有一个问题,如果消费者没有返回 ack 给服务端,此时这条消息可能在已经被消费也可能还没被消费,服务端无从得知,只能根据人为策略跳过(可能会漏消息)或者重发(可能存在重复数据)。另一个问题是,如果有很多消费者,服务端需要记录每条消息的每个消费者的消费状态,这在大数据的场景下,非常消耗性能和内存。

Kafka 将每个消费者的消费状态记录在消费者本身(隔一段时间将最新消费状态同步到 zookeeper),每次消费者要拉数据,就给服务端传递一个 offset,告诉服务端从队列的哪个位置开始给我数据,以及一个参数 length,告诉服务端最多给我多大的数据(批量顺序读数据,更高性能),这样就能使服务端的设计复杂度大大降低。当然这解决不了一致性的问题,不过消费者可以根据自己程序特点,更灵活地处理事务。

下面就来分析整个读的流程:

分布式系统中不可避免的会遇到一致性问题,主要是两块:生产者与队列服务端之间的一致性问题、消费者与队列服务端之间的一致性问题,下面依次展开。

当生产者向服务端投递消息时,可能会由于网络或者其他问题失败,如果要保证一致性,需要生产者在失败后重试,不过重试又会导致消息重复的问题,一个解决方案是每个消息给一个唯一的 id,通过服务端的主动去重来避免重复消息的问题,不过这一机制目前 Kafka 还未实现。目前 Kafka 提供配置,供用户不同场景下选择允许漏消息(失败后不重试)还是允许重复消息(失败后重试)。

由于在消费者里我们可以自己控制消费位置,就可以更灵活的进行个性化设计。如果我们在拉取到消息后,先增加 offset,然后再进行消息的后续处理,如果在消息还未处理完消费者就挂掉,就会存在消息遗漏的问题;如果我们在拉取到消息后,先进行消息处理,处理成功后再增加 offset,那么如果消息处理一半消费者挂掉,会存在重复消息的问题。要做到完全一致,最好的办法是将 offset 的存储与消费者放一起,每消费一条数据就将 offset+1。

本文介绍了 Kafka 的队列实现以及其读写过程。Kafka 认为 *** 作系统级别的文件缓存比 Java 的堆内存更省空间和高效,如果生产者消费者之间比较「和谐」的话,大部分的读写 *** 作都会落在文件缓存,且在顺序读写的情况下,硬盘的速度并不慢,因此选择直接写磁盘文件的方式存储队列。在队列的读写过程中,Kafka 尽可能地使用顺序读写,并使用零拷贝来优化性能。最后,Kafka 让消费者自己控制消费位置,提供了更加灵活的数据消费方式。

kafka分区过的弊端
kafka分区数量是理论是无限的。但是无限多的分区也是会影响kafka和kafka所在的服务器的性能 。

如何确定分区数量
可以遵循一定的步骤来尝试确定分区数:创建一个只有1个分区的topic,然后测试这个topic的producer吞吐量和consumer吞吐量。假设它们的值分别是Tp和Tc,单位可以是MB/s。然后假设总的目标吞吐量是Tt,那么分区数 = Tt / max(Tp, Tc)

使用压测工具,得出最佳分区数
kafka官方也提供了脚本方便我们针对我们的kafka集群做测试,我们可以测试当前提供的硬件条件进行压测,得出当前机器环境到底能支持多少分区数,从而达到尽量最优的方案。
生产者性能测试脚本:kafka-producer-perf-testsh
消费者性能测试脚本:kafka-consumer-perf-testsh

脚本的使用方法

kafka服务器停止,客户端需要重启。除此之外,如果您的Kafka经纪人正在/lib/systemd/system/从Confluent Platform的最新发行版中作为服务运行(位于下),则可以停止该服务或者进行重新启动服务。

建议从头阅读:
银行系统中的消息分发利器Kafka(一)
银行系统中的消息分发利器Kafka(二)

6、Partition
上次我们说到,Kafka可以存储数据,而且数据按照Topic进行分类。
这些存储的数据可能会很大,这可能会给Kafka的Broker带来很大的存储压力。
一个好的解决办法就是把这些数据拆成一个或多个Partition:

然后,把这多个Partition分发到不同的服务器上。
Kafka是一个分布式系统,所以对数据文件的Partition进行分布式管理是很方便的。
随之,另外一个问题来了,我们要把数据分成多少个Partition呢?

在每一个Partition 中,第一个消息的Offset就是0,第二个就是1,以此类推。另外,Offset并不是一个全局的ID,它只作用于所属的Partition。所以,在同一个Partition中,不会有相同的Offset。
结合上面的知识,我们可以知道,如果要在Kafka中定位一个消息信息,就是先找到Topic,然后找到Partition,最后找到Offset。

8、Consumer Group
先把前面的场景复习一下。
首先我们有很多节点的数据要收集,于是我们通过Kafka来实现:

然后我们为每一个节点创建一个Producer:

这时你会发现,处理压力跑到Conumser那里了,于是我们就需要一个Consumer Group了。

Kafka的几个重要的概念就介绍完了。后面我会逐步深入的介绍Kafka的一些细节,欢迎关注~

一、启动ZooKeeper

打开一个新终端并键入以下命令

二、要启动Kafka Broker,请键入以下命令

启动Kafka Broker后,在ZooKeeper终端上键入命令 jps ,您将看到以下响应

现在你可以看到两个守护进程运行在终端上,QuorumPeerMain是ZooKeeper守护进程,另一个是Kafka守护进程。

三、创建Kafka主题

四、主题列表
要获取Kafka服务器中的主题列表,可以使用以下命令

输出

五、启动生产者以发送消息

六、启动消费者以接收消息

七、单节点多代理配置
创建多个Kafka Brokers,我们在配置/ serverproperties中已有一个Kafka代理实例。 现在我们需要多个代理实例,因此将现有的serverprop-erties文件复制到两个新的配置文件中,并将其重命名为server-oneproperties和server-twoproperties。 然后编辑这两个新文件并分配以下更改

config / server-oneproperties

创建主题
让我们为此主题将复制因子值指定为三个,因为我们有三个不同的代理运行。 如果您有两个代理,那么分配的副本值将是两个。

Describe 命令用于检查哪个代理正在侦听当前创建的主题

输出

八、修改主题


欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zz/13473803.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-08-14
下一篇 2023-08-14

发表评论

登录后才能评论

评论列表(0条)

保存