admin管理类 包含了 元数据缓存和 zk 监听客户端
class AdminManager(val config: KafkaConfig,
val metrics: Metrics,
val metadataCache: metadataCache,
val zkClient: KafkaZkClient)
- zk 注册监听处理器
case EventType.NodeChildrenChanged => zNodeChildChangeHandlers.get(path).foreach(.handleChildChange())
case EventType.NodeCreated => zNodeChangeHandlers.get(path).foreach(.handleCreation())
case EventType.NodeDeleted => zNodeChangeHandlers.get(path).foreach(.handleDeletion())
case EventType.NodeDataChanged => zNodeChangeHandlers.get(path).foreach(.handleDataChange()) - 监听到的事件放入 事件管理器的 事件队列 EventManager
根据事件类型 KafkaController::process
集群启动后,broker 在 /controller 节点注册controller, 负责启动自选举或者分配 partition 主备策略。
- controller 不存在, brokerid 和 epoch 写入 zk 这个节点
- 对于controller 注册成功的 broker ,tcp 消息 通知其他broker 更新元数据,此时只有所有brokerid
- 初始化本地副本状态机, 初始化分区状态机
- 对于配置有推荐leader(PreferredReplicaLeaderElection),在 brokers/topics 节点根据配置 leader和isr, 更新分区状态机, 并通知其他brokerid
- 如果配置自由选leader,启动 AutoPreferredReplicaLeaderElection
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)