你想知道的所有关于Kafka Leader选举流程和选举策略都在这了(内含12张高清大图)

你想知道的所有关于Kafka Leader选举流程和选举策略都在这了(内含12张高清大图),第1张

你想知道的所有关于Kafka Leader选举流程和选举策略都在这了(内含12张高清大图)

目录

点击阅读原文查看高清大图

思考几个问题

    什么是分区状态机?

    创建Topic的时候如何选举Leader?

    分区的所有副本都不在线, 这个时候启动一台之前不在ISR内的副本,它会当选为Leader吗?

    当所有副本都不在线,然后一个一个重启Broker上副本上线,谁会当选为Leader?谁先启动就谁当选吗?

    Broker下线了,Leader切换给了其他副本, 当Broker重启的时候,Leader会还给之前的副本吗?

    选举成功的那一刻, 生产者和消费着都做了哪些事情?

    Leader选举期间对分区的影响

分区Leader选举流程分析

在开始源码分析之前, 大家先看下面这张图, 好让自己对Leader选举有一个非常清晰的认知,然后再去看后面的源码分析文章,会更容易理解。

 

    触发选举场景  图左

    执行选举流程  图中

    Leader选举策略  图右

分区状态机

首先大家得了解两个状态机

1. 分区状态机  控制分区状态流转

2. 副本状态机  控制副本状态流转

这里我们主要讲解分区状态机,这张图表示的是分区状态机

分区状态机 

    NonExistentPartition :分区在将要被创建之前的初始状态是这个,表示不存在

    NewPartition: 表示正在创建新的分区, 是一个中间状态, 这个时候只是在Controller的内存中存了状态信息

    OnlinePartition: 在线状态, 正常的分区就应该是这种状态,只有在线的分区才能够提供服务

    OfflinePartition: 下线状态, 分区可能因为Broker宕机或者删除Topic等原因流转到这个状态, 下线了就不能提供服务了

    NonExistentPartition: 分区不存在的状态, 当Topic删除完成成功之后, 就会流转到这个状态, 当还处在删除中的时候,还是停留在下线状态。

我们今天要讲的Leader选举
就是在之前状态=>OnlinePartition状态的时候发生的。

Leader选举流程分析

源码入口:

PartitionStateMachine#electLeaderForPartitions

因篇幅原因源码省略

想获得更好的阅读体验和【查看源码】  请点击【阅读原文】
 

可以看到 我们最终是调用了doElectLeaderForPartitions 执行分区Leader选举。

PartitionStateMachine#doElectLeaderForPartitions

总结一下上面的源码

    去zookeeper节点/broker/topics/{topic名称}/partitions/{分区号}/state 节点读取基本信息。

    遍历从zk中获取的leaderIsrAndControllerEpoch信息,做一些简单的校验:zk中获取的数据的controllerEpoch必须<=当前的Controller的controller_epoch。最终得到 validLeaderAndIsrs, controller_epoch 就是用来防止脑裂的, 当有两个Controller当选的时候,他们的epoch肯定不一样, 那么最新的epoch才是真的Controller

    如果没有获取到有效的validLeaderAndIsrs 信息 则直接返回

    根据入参partitionLeaderElectionStrategy 来匹配不同的Leader选举策略。来选出合适的Leader和ISR信息

    根据上面的选举策略选出的 LeaderAndIsr  信息进行遍历, 将它们一个个写入到zookeeper节点/broker/topics/{topic名称}/partitions/{分区号}/state中。 (当然如果上面没有选择出合适的leader,那么久不会有这个过程了)

    遍历上面写入zk成功的分区, 然后更新Controller里面的分区leader和isr的内存信息 并发送LeaderAndISR请求,通知对应的Broker Leader更新了。

Leader选举流程 

看上面的Leader选举策略是不是很简单, 但是中间究竟是如何选择Leader的? 这个是根据传入的策略类型, 来做不同的选择

那么有哪些策略呢?以及什么时候触发这些选举呢?

分区的几种策略以及对应的触发场景 1. OfflinePartitionLeaderElectionStrategy

遍历分区的AR, 找到第一个满足以下条件的副本:

    副本在线

    在ISR中。

如果找不到满足条件的副本,那么再根据 传入的参数allowUnclean判断

    allowUnclean=true:AR顺序中所有在线副本中的第一个副本。

    allowUnclean=false: 需要去查询配置  unclean.leader.election.enable 的值。
    若=true ,则跟上面 1一样 。
    若=false,直接返回None,没有找到合适的Leader。

     

离线分区Leader选举策略 

源码位置:

Election#leaderForOffline

因篇幅原因源码省略

想获得更好的阅读体验和【查看源码】  请点击【阅读原文】
 

    先组装所有给定的 validLeaderAndIsrs 的信息 其实主要还是要去获取每个Topic的对应的unclean.leader.election.enable 属性值。
    默认情况下,我们调用到这里的时候 这个入参allowUnclean=false.
    如果是false 那我们需要去查询一下指定的topic它的属性unclean.leader.election.enable 是什么
    如果是true 则表示直接覆盖了unclean.leader.election.enable的配置为true。

     

    在这里插入图片描述

    找到 第一个满足条件:副本在线 && 在 ISR中的副本。

    如果没有满足条件的 则判断入uncleanLeaderElectionEnabled的配置 如果是true,则从不在isr中的存活副本中获取副本作为leader。 当然这个uncleanLeaderElectionEnabled 参数是上 步骤1中决定的。

触发场景:Controller 重新加载

Controller 当选的时候会启动 分区状态机 partitionStateMachine, 启动的时候会重新加载所有分区的状态到内存中, 并触发 对处于 NewPartition 或 OfflinePartition 状态的所有分区尝试变更为  OnlinePartition 状态的状态。把新创建的分区和离线的分区触发一下选举流程啊

触发源码入口:

KafkaController#onControllerFailover

partitionStateMachine.startup()

 partitionStateMachine.triggeronlinePartitionStateChange()
 

触发场景:脚本执行脏选举

当执行 kafka-leader-election.sh 的时候并且模式选择的是UNCLEAN . 则会触发这个模式。
这里注意一下,入参allowUnclean = (electionTrigger == AdminClientTriggered)
意思是: 当触发的场景是AdminClientTriggered的时候, 则allowUnclean=true,表示 不关心配置参数 unclean.leader.election.enable 是什么
如果没有找到符合条件的Leader, 则就去非ISR 列表找Leader。 刚好我们执行脚本的时候触发器就是AdminClientTriggered其他触发器有:
AutoTriggered : 定时自动触发。
ZkTriggered:Controller切换的时候触发的(zk节点/controller 的变更便是Controller角色的切换) AdminClientTriggered:客户端主动触发。

触发场景:Controller 监听到有Broker启动了

同上。

触发源码入口:

KafkaController#processBrokerChange#onBrokerStartup

   partitionStateMachine.triggeronlinePartitionStateChange()

触发场景:Controller 监听 LeaderAndIsrResponseReceived请求

同上。

当Controller向对应的Broker发起 LeaderAndIsrRequest 请求的时候.
有一个回调函数callback, 这个回调函数会向Controller发起一个事件为  LeaderAndIsrResponseReceived 请求。

具体源码在:
ControllerChannelManager#sendLeaderAndIsrRequest

在这里插入图片描述

Controller收到这个事件的请求之后,根据返回的 leaderAndIsrResponse 数据
会判断一下有没有新增加的离线副本(一般都是由于磁盘访问有问题)
如果有新的离线副本,则需要将这个离线副本标记为Offline状态

源码入口:

KafkaController#onReplicasBecomeOffline

  partitionStateMachine.triggeronlinePartitionStateChange()

触发场景:Controller 监听 UncleanLeaderElectionEnable请求

当我们在修改动态配置的时候, 将动态配置:unclean.leader.election.enable设置为 true 的时候
会触发向Controller发起UncleanLeaderElectionEnable的请求,这个时候则需要触发一下。触发请求同上。

触发源码入口:

KafkaController#processTopicUncleanLeaderElectionEnable

 partitionStateMachine.triggeronlinePartitionStateChange(topic)

上面的触发调用的代码就是下面的接口

对处于 NewPartition 或 OfflinePartition 状态的所有分区尝试变更为OnlinePartition 的状态。 状态的流程触发了Leader选举。

    获取所有 OfflinePartition 、NewPartition 的分区状态

    尝试将 所有 NewPartition or OfflinePartition 状态的分区全部转别成 OnlinePartition状态, 但是如果对应的Topic正在删除中,则会被排除掉

    分区状态机进行状态流转 使用 OfflinePartitionLeaderElectionStrategy 选举策略(allowUnclean=true 表示如果从isr中没有选出leader,则允许从非isr列表中选举leader ,allowUnclean=false 表示如果从isr中没有选出leader, 则需要去读取配置文件的配置 unclean.leader.election.enable 来决定是否允许从非ISR列表中选举Leader。 )

2. ReassignPartitionLeaderElectionStrategy

分区副本重分配选举策略: 当执行分区副本重分配的时候, 原来的Leader可能有变更, 则需要触发一下 Leader选举。

    只有当之前的Leader副本在经过重分配之后不存在了。
    例如: [2,0] ==> [1,0] 。 原来2是Leader副本,经过重分配之后变成了 [1,0]。2已经不复存在了,所以需要重新选举Leader。

    当原来的分区Leader副本 因为某些异常,下线了。需要重新选举Leader

     

分区副本重分配发生的Leader选举.

Election#leaderForReassign

因篇幅原因源码省略

想获得更好的阅读体验和【查看源码】  请点击【阅读原文】
 

总结:

从当前的副本分配列表中,获取副本在线&&副本在ISR中的 第一个副本,遍历的顺序是当前副本的分配方式(AR),跟ISR的顺序没有什么关系。

触发场景:分区副本重分配

并不是每次执行分区副本重分配都会触发这个Leader选举策略, 下面两种情况才会触发

    只有当之前的Leader副本在经过重分配之后不存在了。例如: [2,0] ==> [1,0] 。 原来2是Leader副本,经过重分配之后变成了 [1,0]。2已经不复存在了,所以需要重新选举Leader。

    当原来的分区Leader副本 因为某些异常,下线了。需要重新选举Leader

对应的判断条件代码如下:

KafkaController#moveReassignedPartitionLeaderIfRequired

点击查看分区重分配的源码解析

3. PreferredReplicaPartitionLeaderElectionStrategy

优先副本选举策略, 必须满足三个条件:
是第一个副本&&副本在线&&副本在ISR列表中。
满足上面三个条件才会当选leader,不满足则不会做变更。

 

 

优先副本选举 (点击阅读原文看高清大图)

 def leaderForPreferredReplica(controllerContext: ControllerContext,
                                leaderAndIsrs: Seq[(TopicPartition, LeaderAndIsr)]): Seq[ElectionResult] = {
    leaderAndIsrs.map { case (partition, leaderAndIsr) =>
      leaderForPreferredReplica(partition, leaderAndIsr, controllerContext)
    }
  }

   private def leaderForPreferredReplica(partition: TopicPartition,
                                        leaderAndIsr: LeaderAndIsr,
                                        controllerContext: ControllerContext): ElectionResult = {
    // AR列表                                    
    val assignment = controllerContext.partitionReplicaAssignment(partition)
    // 在线副本
    val liveReplicas = assignment.filter(replica => controllerContext.isReplicaonline(replica, partition))
    val isr = leaderAndIsr.isr
    // 找出第一个副本 是否在线 并且在ISR中。
    val leaderOpt = PartitionLeaderElectionAlgorithms.preferredReplicaPartitionLeaderElection(assignment, isr, liveReplicas.toSet)
    // 组装leaderandisr返回 ,注意这里是没有修改ISR信息的
    val newLeaderAndIsrOpt = leaderOpt.map(leader => leaderAndIsr.newLeader(leader))
    ElectionResult(partition, newLeaderAndIsrOpt, assignment)
  }

  def preferredReplicaPartitionLeaderElection(assignment: Seq[Int], isr: Seq[Int], liveReplicas: Set[Int]): Option[Int] = {
    assignment.headOption.filter(id => liveReplicas.contains(id) && isr.contains(id))
  }
  

    从内存中获取TopicPartition的分配方式

    过滤不在线的副本

    找到第一个副本判断一下是否在线&&在ISR列表中。如果满足,则选他为leader,如果不满足,也不会再找其他副本了。

    返回leaderAndIsr信息, 这里的ISR是没有做修改的。

触发场景:自动定时执行优先副本选举任务

Controller 启动的时候,会启动一个定时任务 。每隔一段时间就去执行 优先副本选举任务。

与之相关配置:

## 如果为true表示会创建定时任务去执行 优先副本选举,为false则不会创建
auto.leader.rebalance.enable=true 

## 每隔多久执行一次 ; 默认300秒;
leader.imbalance.check.interval.seconds partition = 300

##标识每个 Broker 失去平衡的比率,如果超过该比率,则执行重新选举 Broker 的 leader;默认比例是10%;
##这个比率的算法是 :broker不平衡率=非优先副本的leader个数/总分区数,
##假如一个topic有3个分区[0,1,2],并且有3个副本 ,正常情况下,[0,1,2]分别都为一个leader副本; 这个时候 0/3=0%;
leader.imbalance.per.broker.percentage = 10

触发场景: Controller 重新加载的时候

在这个触发之前还有执行
partitionStateMachine.startup()
相当于是先把 OfflinePartition、NewPartition状态的分区执行了OfflinePartitionLeaderElectionStrategy 策略。
然后又执行了
PreferredReplicaPartitionLeaderElectionStrategy策略 这里是从zk节点 /admin/preferred_replica_election 读取数据, 来进行判断是否有需要执行Leader选举的分区
它是在执行kafka-preferred-replica-election 命令的时候会创建这个zk节点
但是这个已经被标记为废弃了,并且在3.0的时候直接移除了。

源码位置:

KafkaController#onControllerFailover

 // 从zk节点/admin/preferred_replica_election找到哪些符合条件需要执行优先副本选举的分区
 val pendingPreferredReplicaElections = fetchPendingPreferredReplicaElections()
 // 这里的触发类型 是 ZkTriggered
 onReplicaElection(pendingPreferredReplicaElections, ElectionType.PREFERRED, ZkTriggered)
 
private def fetchPendingPreferredReplicaElections(): Set[TopicPartition] = {
 // 去zk读取节点  /admin/preferred_replica_election
    val partitionsUndergoingPreferredReplicaElection = zkClient.getPreferredReplicaElection
    // 如果指定分区的 leader 已经是AR的第一个副本 或者 topic被删除了,则 过滤掉这个分区(没有必要执行leader选举了)
    val partitionsThatCompletedPreferredReplicaElection = partitionsUndergoingPreferredReplicaElection.filter { partition =>
      val replicas = controllerContext.partitionReplicaAssignment(partition)
      val topicDeleted = replicas.isEmpty
      val successful =
        if (!topicDeleted) controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader == replicas.head else false
      successful || topicDeleted
    }
    // 将zk获取到的分区数据 - 刚刚需要忽略的数据 = 还需要执行选举的数据
    val pendingPreferredReplicaElectionsIgnoringTopicDeletion = partitionsUndergoingPreferredReplicaElection -- partitionsThatCompletedPreferredReplicaElection
    // 找到哪些分区正在删除
    val pendingPreferredReplicaElectionsSkippedFromTopicDeletion = pendingPreferredReplicaElectionsIgnoringTopicDeletion.filter(partition => topicDeletionManager.isTopicQueuedUpForDeletion(partition.topic))
    // 待删除的分区也过滤掉
    val pendingPreferredReplicaElections = pendingPreferredReplicaElectionsIgnoringTopicDeletion -- pendingPreferredReplicaElectionsSkippedFromTopicDeletion
 // 返回最终需要执行优先副本选举的数据。
    pendingPreferredReplicaElections
  }
  

触发场景:执行优先副本选举脚本的时候

执行脚本kafka-leader-election.sh 并且选择的模式是 PREFERRED (优先副本选举) 则会选择  PreferredReplicaPartitionLeaderElectionStrategy 策略选举

4. ControlledShutdownPartitionLeaderElectionStrategy

受控关机选举策略 :
当Broker关机的过程中,会向Controller发起一个请求, 让它重新发起一次选举, 把在所有正在关机(也就是发起请求的那个Broker,或其它同时正在关机的Broker) 的Broker里面的副本给剔除掉。


根据算法算出leader:找到第一个满足条件的副本:
副本在线 && 副本在ISR中  && 副本所在的Broker不在正在关闭的Broker集合中 。

构造新的ISR列表: 在之前的isr列表中将  正在被关闭的Broker里面的副本 给剔除掉

受控关机Leader选举策略 (点击阅读原文查看高清大图)

Election#leaderForControlledShutdown

 

触发场景:Broker关机的时候

当Broker关闭的时候, 会向Controller发一起一个ControlledShutdownRequest请求,  Controller收到这个请求会针对性的做一些善后事件。比如说 执行Leader重选举 等等之类的。

源码位置:KafkaServer#controlledShutdown

Controller收到请求的源码位置:KafkaController#doControlledShutdown

与之相关的配置有:

在这里插入图片描述

其他场景 新创建的Topic Leader选举策略

创建新的Topic的时候,并没有发生Leader选举的 *** 作, 而是默认从分区对应的所有在线副本中选择第一个为leader, 然后isr就为 所有在线副本,再组装一下当前的controller_epoch信息,写入到zk节点/brokers/topics/{Topic名称}/partitions/{分区号}/state中。
最后发起  LeaderAndIsrRequest 请求,通知 leader 的变更。

详细看看源码:

PartitionStateMachine#doHandleStateChanges
分区状态从 NewPartition流转到OnlinePartition

    从当前的Controller 内存中获取所有入参的分区对应的副本信息

    过滤那些已经下线的副本( Broker宕机、网络异常、磁盘脱机、等等都有可能造成副本下线) 。

    每个分区对应的所有在线副本信息 为 ISR 信息,然后取ISR的第一个副本为leader分区。当然特别注意一下, 这个时候获取的isr信息的顺序就是 分区创建时候分配好的AR顺序, 获取第一个在线的。(因为在其他情况下 ISR的顺序跟AR的顺序并不一致)

    组装 上面的 isr、leader、controller_epoch 等信息 写入到zk节点 /brokers/topics/{Topic名称}/partitions/{分区号}/state例如下面所示
    {"controller_epoch":1,"leader":0,"version":1,"leader_epoch":0,"isr":[0,1,2]}
    

    然后向其他相关Broker 发起 LeaderAndIsrRequest 请求,通知他们Leader和Isr信息已经变更了,去做一下想要的处理。比如去新的leader发起Fetcher请求同步数据。

可以看看之前我们分析过的 Topic创建的源码解析 的原理图 如下

重点看:

4.4.1 已经确定Leader是谁了

回答上面的问题

现在,看完全文之后,我想你应该对下面的问题很清楚了吧!

什么是分区状态机

所有的分区状态的流转都是通过分区状态机来进行的, 统一管理! 每个分区状态的流转 都是有严格限制并且固定的,流转到不同状态需要执行的 *** 作不一样, 例如 当分区状态流转到   OnlinePartition 的时候, 就需要判断是否需要执行 Leader选举 ,

分区状态机

创建Topic的时候如何选举Leader?

创建Topic的时候并没有发生 Leader选举, 而是默认将 在线的第一个副本设置为Leader,所有在线的副本列表 为 ISR 列表。 写入到了zookeeper中。

分区的所有副本都不在线, 这个时候启动一台之前不在ISR内的副本的Broker,它会当选为Leader吗?

视情况而定。 首先, 启动一台Broker, 会用什么策略选举?
看上面的图,我们可以知道是
OfflinePartitionLeaderElectionStrategy

然后看下这个策略是如何选举的?

 

 

点击阅读原文查看高清大图

那么最终结果就是:
所有副本不在线,那么一个Leader的候选者都当选不了
那么这个时候就会判断 unclean.leader.election.enable 配置是否为true.
如果是true, 则当前在线的副本就是只有自己这个刚启动的在线副本,自然而然就会当选Leader了。
如果是fase, 则没有副本能够当前Leader, 次数处于一个无Leader的状态。

当所有副本都不在线,然后一个一个重启Broker上副本上线,谁会当选为Leader?谁先启动就谁当选吗?

不是, 跟上一个问题同理
根据 unclean.leader.election.enable  配置决定。
如果是true, 则谁先启动,谁就当选(会丢失部分数据)
如果是false,则第一个在ISR列表中的副本当选。
顺便再提一句, 虽然在这里可能不是AR中的第一个副本当选Leader。
但是最终还是会自动执行Leader均衡的,自动均衡使用的策略是
PreferredReplicaPartitionLeaderElectionStrategy
(前提是开启了自动均衡: auto.leader.rebalance.enable=true)

Broker下线了,Leader切换给了其他副本, 当Broker重启的时候,Leader会还给之前的副本吗?

根据配置 auto.leader.rebalance.enable=true 决定。true: 会自动执行Leader均衡,  自动均衡策略是 PreferredReplicaPartitionLeaderElectionStrategy 策略
false: 不执行自动均衡。 那么久不会还回去。 关于更详细的 Leader均衡机制请看 Leader 均衡机制

Leader选举期间对分区的影响

Leader的选举基本上不会造成什么影响, Leader的切换非常快, 每个分区不可用的时间在几毫秒内。

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5706090.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存