Kafka-Broker的基本模块

Kafka-Broker的基本模块,第1张

Kafka-Broker的基本模块 Broker简介:

      kafka集群是由若干个broker组成,可以简单得理解为broker就是部署在服务器的kafka节点。

 

 

1.SocketServer

    SocketServer作为Broker对外提供Socket服务的模块,主要用于接收socket连接的请求,然后产生相应为之服务的SocketChannel对象。

内部主要包括三个模块:

  • Acceptor主要用于监听Socket连接;
  • Processor主要用于转发Socket的请求和响应。
  • RequestChannel主要用于缓存Socket的请求和响应。

1.1Acceptor对象主要功能

(1)开启socket服务

(2)注册Accept事件

(3)监听此ServerChannel上的ACCEPT事件,当其发生时,将其以轮询的方式把对应的 SocketChannel转交给Processor处理线程。

1.2Processor对象主要功能
(1)当有新的SocketChannel对象进来的时候,注册其上的OP_READ事件以便接收客户端的请求。

(2)从RequestChannel中的响应队列获取对应客户端的请求,然后产生OP_WRITE事件。

(3)监听selector上的事件。如果是读事件,说明有新的request到来,需要转移给   RequestChannel的请求队列;如果是写事件,说明之前的request已经处理完毕,需要从   RequestChannel的响应队列获取响应并发送回客户端;如果是关闭事件,说明客户端已经关闭了   该Socket连接,此时服务端也应该释放相关资源。

1.3RequestChannel

    本质上就是为了解耦SocketServer和KafkaApis两个模块,内部包含Request的阻塞队列和Response的阻塞队列。

注:SocketServer为了防止空闲连接大量存在,采用了LRU算法,即最近最少使用算法,会将长时间没有交互的SocketChannel对象关闭,及时释放资源。因此Processor仅仅是起到了接收Request,发送Response的作用,其处理Request的具体业务逻辑是由KafkaApis层负责的,并且两者之间是通过RequestChannel相互联系起来的。

                                         图: SocketServer内部模块之间的相互联系

总结可得,SocketServer负责下面三个方面:

(1)建立Socket,保持和客户端的通信;

(2)转发客户端的Request;

(3)返回Response给客户端。最后通过RequestChannel与其他模块解耦。

2.KafkaRequestHandlerPool

    KafkaRequestHandlerPool本质上就是一个线程池,里面包含了num.io.threads 个IO处理线程,默认 为8个。KafkaRequestHandlerPool在内部启动了若干个KafkaRequestHandler处理线程,并将RequestChannel对象和KafkaApis对象传递给了KafkaRequestHandler处理线程,因为KafkaRequestHandler需要从前者的requestQueue中取出Request,并且利用后者来完成具体的业务逻辑。

3.KafkaApis

    KafkaApis负责具体的业务逻辑,它主要和Producer、Consumer、Broker Server交互。 KafkaApis主要依赖以下四个组件来完成具体的业务逻辑:

  • LogManager提供针对Kafka的topic日志的读取和写入功能。
  • ReplicaManager提供针对topic分区副本数据的同步功能。
  • OffsetManager提供针对提交至Kafka偏移量的管理功能。
  • KafkaSchedule为其他模块提供定时的调度和管理功能。

3.1LogManager

    LogManager负责提供Broker Server上topic的分区数据读取和写入功能,负责读取和写入位于Broker Server上的所有分区副本数据;如果Partition有多个Replica,则每个Broker Server不会存在相同Partition的Replica;如果存在的话,一旦遇到Broker Server下线,则会立刻丢失Partition的多份副本,失去 了一定的可靠性。

Topic、Partition和Replica三者之间的关联关系:

 

3.2ReplicaManager

    ReplicaManager负责提供针对topic的分区副本数据的同步功能,需要针对不同的变化做出及时响应,例如Partition的Replicas发送Leader切换时,Partition的Replicas所在的Broker Server离线的时候,Partition的Replicas发生Follower同步Leader数据异常的时候,等等。

分区两个名词:AR和ISR

  • AR是Assign Replicas的缩写,代表已经分配给Partition的副本。
  • ISR是In-Sync Replicas的缩写,代表处于同步状态的副本。

并不是所有的AR都是ISR,尤其是当Broker Server离线的时候会导致对应TopicAndPartition的Replica没有及时同步Leader状态的Replica,从而该Replica不是ISR。

a.ReplicaManager是如何实现Replica数据的同步?

主要利用ReplicaFetcherThread(副本数据拉取线程)和Height Watermark Mechanism(高水位线机制)来实现数据的同步管理。

b.什么是高水位?

    本质上代表的是ISR中的所有replicas的last commited message的最小起始偏移量,即在这偏移之前的数据都被ISR所有的replicas所接收,但是在这偏移之后的数据被ISR中的部分replicas所接收。

 其中RecoverPoint代表的是recover-point-offset-checkpoint文件中记录的偏移量,LogEndOffset代表的是当前TopicAndPartition的replica所接收到消息的最大偏移量,HeightWatermark代表的是已经同步给所有ISR的最小偏移量。Replica的HeightWatermark发生更新在以下两种情况:

(1)Leader状态的Replica接收到其他Follower状态的Replica的FetchRequest请求时,会选择性得更新HeightWatermark。

(2)Follower状态的Replica接收到来自Leader状态的Replica的FetchResponse时,会选择性更新HeightWatermark,即ReplicaFetcherThread内部的processPartitionData流程。

4.OffsetManager

4.1Kafka提供两种保存Consumer偏移量的方法:

(1)将偏移量保存到Zookeeper中。

(2)将偏移量保存至Kafka内部一个名为_consumer_offsets的Topic里面。

将偏移量保存至Zookeeper中是kafka一直就支持的,但是考虑到zookeeper并不太适合大批量的频繁写入 *** 作,因此kafka开始支持将Consumer的偏移量保存再Kafka内部的topic中,即_consumer_offsets Topic。当用户配置offsets.storage=kafka时,高级消费者会将偏移量保存至Topic里面,同时通过OffsetManager提供对这些偏移量的管理。

4.2 OffsetManager主要功能

  • 缓存最新的偏移量。
  • 提供对偏移量的查询。
  • Compact,保留最新的偏移量,以此来控制Topic日志的大小。

Kafka如何将Consumer Group 产生的偏移量信息保存在_consumer_offsets的不同分区?

    本质是通过计算不同Consumer Group的hash值和_consumer_offsets的分区数的模数,其结果作为指定分区的索引。

5.KafkaScheduler

    KafkaScheduler为其他模块提供定时任务的调度和管理,例如LogManager内部的cleanupLogs定时任务,flushDirtyLogs定时任务和checkpointRecoverPointOffsets定时任务;ReplicaManager模块内部的maybeShrinkIsr定时任务;OffsetManager内部的offsets-cache-compactor定时任务等等。KafkaScheduler内部是基于ScheduledThreadPoolExecutor实现的,对外封装了任务调度的接口schedule,线程个数由参数background.threads决定,默认值为10。

6.KafkaHealthcheck

    KafkaHealthcheck主要提供Broker Server健康状态的上报。Broker Server健康状态本质上就是指Broker Server是否在线,如果Broker Server在线,说明处于健康状态,如果Broker Server离线,说明处于死亡状态。

Broker Server如何上报健康状态?

    BrokerChangeListener通过监听目录为/brokers/ids的zookeeper路径,当发生有数据变化时,则获取当前目录下的数据,从而获取当前集群的在线Broker Server列表。而KafkaHealthcheck正是提供了在目录为/brokers/ids的Zookeeper路径上注册节点的能力,该节点所在路径为EphemeralPath(非永久路径),当Broker Server由于异常情况导致下线时,此EphemeralPath随着Broker Server和zookeeper链接的断开而消失。

7.TopicConfigManager

     kafka提供对topic配置参数的在线修改能力,修改完成之后无需重新启动kafka集群,在线生效。Topic配置参数包括:数据文件的大小,索引文件的大小,索引项的大小,索引项的粒度,日志文件保留的策略等等;

    Topic的配置参数位于路径为/config/topics/[topic]的zookeeper上,Broker Server内部为了避免针对每个Topic都在相关路径上建立监听器,对外提供了一个被通知的路径,其位于/brokers/config_changes,如果检测到该路径 上发生变化,则读取该路径上的数据,获取配置文件待更新的Topic,然后再从/config/topics/[topic]上加载最新的配置文件。

 

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5668896.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存