Kafka的流行归功于它设计和 *** 作简单、存储系统高效、充分利用磁盘顺序读写等特性、非常适合在线日志收集等高吞吐场景。
Kafka特性之一是它的复制协议。复制协议是保障kafka高可靠性的关键。对于单个集群中每个Broker不同工作负载情况下,如何自动调优Kafka副本的工作方式是比较有挑战的。它的挑战之一是要知道如何避免follower进入和退出同步副本列表(即ISR)。从用户的角度来看,如果生产者发送一大批海量消息,可能会引起Kafka Broker很多警告。这些警报表明一些topics处于“under replicated”状态,这些副本处于同步失败或失效状态,更意味着数据没有被复制到足够数量Broker从而增加数据丢失的概率。因此Kafka集群中处于“under replicated”中Partition数要密切监控。这个警告应该来自于Broker失效,减慢或暂停等状态而不是生产者写不同大小消息引起的。
Kafka中主题的每个Partition有一个预写式日志文件,每个Partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到Partition中,Partition中的每个消息都有一个连续的序列号叫做offset, 确定它在分区日志中唯一的位置。
Kafka每个topic的partition有N个副本,其中N是topic的复制因子。Kafka通过多副本机制实现故障自动转移,当Kafka集群中一个Broker失效情况下仍然保证服务可用。在Kafka中发生复制时确保partition的预写式日志有序地写到其他节点上。N个replicas中。其中一个replica为leader,其他都为follower,leader处理partition的所有读写请求,与此同时,follower会被动定期地去复制leader上的数据。
如下图所示,Kafka集群中有4个broker, 某topic有3个partition,且复制因子即副本个数也为3:
Kafka提供了数据复制算法保证,如果leader发生故障或挂掉,一个新leader被选举并被接受客户端的消息成功写入。Kafka确保从同步副本列表中选举一个副本为leader,或者说follower追赶leader数据。leader负责维护和跟踪ISR(In-Sync Replicas的缩写,表示副本同步队列,具体可参考下节)中所有follower滞后的状态。当producer发送一条消息到broker后,leader写入消息并复制到所有follower。消息提交之后才被成功复制到所有的同步副本。消息复制延迟受最慢的follower限制,重要的是快速检测慢副本,如果follower“落后”太多或者失效,leader将会把它从ISR中删除。
副本同步队列(ISR)
所谓同步,必须满足如下两个条件:
默认情况下Kafka对应的topic的replica数量为1,即每个partition都有一个唯一的leader,为了确保消息的可靠性,通常应用中将其值(由broker的参数offsets.topic.replication.factor指定)大小设置为大于1,比如3。 所有的副本(replicas)统称为Assigned Replicas,即AR。ISR是AR中的一个子集,由leader维护ISR列表,follower从leader同步数据有一些延迟。任意一个超过阈值都会把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表,新加入的follower也会先存放在OSR中。AR=ISR+OSR。
上一节中的HW俗称高水位,是HighWatermark的缩写,取一个partition对应的ISR中最小的LEO作为HW,consumer最多只能消费到HW所在的位置。另外每个replica都有HW,leader和follower各自负责更新自己的HW的状态。对于leader新写入的消息,consumer不能立刻消费,leader会等待该消息被所有ISR中的replicas同步后更新HW,此时消息才能被consumer消费。这样就保证了如果leader所在的broker失效,该消息仍然可以从新选举的leader中获取。对于来自内部broKer的读取请求,没有HW的限制。
下图详细的说明了当producer生产消息至broker后,ISR以及HW和LEO的流转过程:
由此可见,Kafka的复制机制既不是完全的同步复制,也不是单纯的异步复制。事实上,同步复制要求所有能工作的follower都复制完,这条消息才会被commit,这种复制方式极大的影响了吞吐率。而异步复制方式下,follower异步的从leader复制数据,数据只要被leader写入log就被认为已经commit,这种情况下如果follower都还没有复制完,落后于leader时,突然leader宕机,则会丢失数据。而Kafka的这种使用ISR的方式则很好的均衡了确保数据不丢失以及吞吐率。
副本不同步的异常情况
broker 分配的任何一个 partition 都是以 Replica 对象实例的形式存在,而 Replica 在 Kafka 上是有两个角色: leader 和 follower,只要这个 Replica 是 follower,它便会向 leader 进行数据同步。
反映在 ReplicaManager 上就是如果 Broker 的本地副本被选举为 follower,那么它将会启动副本同步线程,其具体实现如下所示:
简单来说,makeFollowers() 的处理过程如下:
关于第6步,并不一定会为每一个 partition 都启动一个 fetcher 线程,对于一个目的 broker,只会启动 num.replica.fetchers 个线程,具体这个 topic-partition 会分配到哪个 fetcher 线程上,是根据 topic 名和 partition id 进行计算得到,实现所示:
如上所示,在 ReplicaManager 调用 makeFollowers() 启动 replica fetcher 线程后,它实际上是通过 ReplicaFetcherManager 实例进行相关 topic-partition 同步线程的启动和关闭,其启动过程分为下面两步:
addFetcherForPartitions() 的具体实现如下所示:
这个方法其实是做了下面这几件事:
ReplicaFetcherManager 创建 replica Fetcher 线程的实现如下:
replica fetcher 线程在启动之后就开始进行正常数据同步流程了,这个过程都是在 ReplicaFetcherThread 线程中实现的。
ReplicaFetcherThread 的 doWork() 方法是一直在这个线程中的 run() 中调用的,实现方法如下:
在 doWork() 方法中主要做了两件事:
processFetchRequest() 这个方法的作用是发送 Fetch 请求,并对返回的结果进行处理,最终写入到本地副本的 Log 实例中,其具体实现:
其处理过程简单总结一下:
fetch() 方法作用是发送 Fetch 请求,并返回相应的结果,其具体的实现,如下:
processPartitionData
这个方法的作用是,处理 Fetch 请求的具体数据内容,简单来说就是:检查一下数据大小是否超过限制、将数据追加到本地副本的日志文件中、更新本地副本的 hw 值。
在副本同步的过程中,会遇到哪些异常情况呢?
大家一定会想到关于 offset 的问题,在 Kafka 中,关于 offset 的处理,无论是 producer 端、consumer 端还是其他地方,offset 似乎都是一个形影不离的问题。在副本同步时,关于 offset,会遇到什么问题呢?下面举两个异常的场景:
以上两种情况都是 offset OutOfRange 的情况,只不过:一是 Fetch Offset 超过了 leader 的 LEO,二是 Fetch Offset 小于 leader 最小的 offset
在介绍 Kafka 解决方案之前,我们先来自己思考一下这两种情况应该怎么处理?
上面是我们比较容易想出的解决方案,而在 Kafka 中,其解决方案也很类似,不过遇到情况比上面我们列出的两种情况多了一些复杂,其解决方案如下:
针对第一种情况,在 Kafka 中,实际上还会发生这样一种情况,1 在收到 OutOfRange 错误时,这时去 leader 上获取的 LEO 值与最小的 offset 值,这时候却发现 leader 的 LEO 已经从 800 变成了 1100(这个 topic-partition 的数据量增长得比较快),再按照上面的解决方案就不太合理,Kafka 这边的解决方案是:遇到这种情况,进行重试就可以了,下次同步时就会正常了,但是依然会有上面说的那个问题。
replica fetcher 线程关闭的条件,在三种情况下会关闭对这个 topic-partition 的拉取 *** 作:
这里直接说线程关闭,其实不是很准确,因为每个 replica fetcher 线程 *** 作的是多个 topic-partition,而在关闭的粒度是 partition 级别,只有这个线程分配的 partition 全部关闭后,这个线程才会真正被关闭。
stopReplica
StopReplica 的请求实际上是 Controller 发送过来的,这个在 controller 部分会讲述,它触发的条件有多种,比如:broker 下线、partition replica 迁移等等。
makeLeaders
makeLeaders() 方法的调用是在 broker 上这个 partition 的副本被设置为 leader 时触发的,其实现如下:
调用 ReplicaFetcherManager 的 removeFetcherForPartitions() 删除对这些 topic-partition 的副本同步设置,这里在实现时,会遍历所有的 replica fetcher 线程,都执行 removePartitions() 方法来移除对应的 topic-partition 集合。
removePartitions
这个方法的作用是:ReplicaFetcherThread 将这些 topic-partition 从自己要拉取的 partition 列表中移除。
ReplicaFetcherThread的关闭
前面介绍那么多,似乎还是没有真正去关闭,那么 ReplicaFetcherThread 真正关闭是哪里 *** 作的呢?
实际上 ReplicaManager 每次处理完 LeaderAndIsr 请求后,都会调用 ReplicaFetcherManager 的 shutdownIdleFetcherThreads() 方法,如果 fetcher 线程要拉取的 topic-partition 集合为空,那么就会关闭掉对应的 fetcher 线程。
实际项目中我们可能在创建topic时没有设置好正确的replication-factor,导致kafka集群虽然是高可用的,但是该topic在有broker宕机时,可能发生无法使用的情况。topic一旦使用又不能轻易删除重建,因此动态增加副本因子就成为最终的选择。
原因分析:
假设我们有3个kafka broker分别brokerA、brokerB、brokerC.
如何动态给已经创建的topic添加replication-factor?
可能很多人想使用kafka-topics.sh脚本,那么事情情况如何了?
截图
可以看出kafka-topics.sh不能用来增加副本因子replication-factor。实际应该使用kafka bin目录下面的kafka-reassign-partitions.sh。
a,首先我们配置topic的副本,保存为json文件()
例如, 我们想把yqtopic01的部分设置为3,(我的kafka集群有3个broker,id分别为0,1,2), json文件名称为increase-replication-factor.json
{"version":1,
"partitions":[
{"topic":"yqtopic01","partition":0,"replicas":[0,1,2]},
{"topic":"yqtopic01","partition":1,"replicas":[0,1,2]},
{"topic":"yqtopic01","partition":2,"replicas":[0,1,2]}
]}
b,然后执行脚本
./kafka-reassign-partitions.sh -zookeeper 127.0.0.1:2181 --reassignment-json-file increase-replication-factor.json --execute
kafka-reassign-partitions.sh执行截图
我们可以通过执行
kafka-topics.sh --describe --zookeeper localhost:2181 --topic yqtopic01查看现在该topic的副本因子。
总结
所有文档官方文档最权威。 https://kafka.apache.org/documentation/#basic_ops_increase_replication_factor
摘录如下:
Increasing replication factor
Increasing the replication factor of an existing partition is easy. Just specify the extra replicas in the custom reassignment json file and use it with the --execute option to increase the replication factor of the specified partitions.
For instance, the following example increases the replication factor of partition 0 of topic foo from 1 to 3. Before increasing the replication factor, the partition's only replica existed on broker 5. As part of increasing the replication factor, we will add more replicas on brokers 6 and 7.
The first step is to hand craft the custom reassignment plan in a json file:
Then, use the json file with the --execute option to start the reassignment process:
The --verify option can be used with the tool to check the status of the partition reassignment. Note that the same increase-replication-factor.json (used with the --execute option) should be used with the --verify option:
You can also verify the increase in replication factor with the kafka-topics tool:
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)