kafka producer将消息发送给broker后,消息日志会被存储在broker的磁盘上,采用顺序写入的方式。顺序写可以加快磁盘访问速度,并且可以将将多个小型的逻辑写合并成一次大型的物理磁盘写入,官方数据显示顺序写比随机写入快6000倍以上。另外, *** 作系统使用内存对磁盘进行缓存即pagecache,pagecache完全由 *** 作系统管理,这也使得写数据变得即简洁也快速。
配置中可以调整过期时间,超过改时间的消息日志将移除,默认值为7天;也可配置文件大小阈值,达到该阈值后,从最旧消息开始删除。配置项为:
从文件到套接字的常见数据传输路径有4步:
1) *** 作系统从磁盘读取数据到内核空间的 pagecache
2)应用程序读取内核空间的数据到用户空间的缓冲区
3)应用程序将数据(用户空间的缓冲区)写回内核空间到套接字缓冲区(内核空间)
4) *** 作系统将数据从套接字缓冲区(内核空间)复制到通过网络发送的 NIC 缓冲区
kafka使用 producer ,broker 和 consumer 都共享的标准化的二进制消息格式,这样数据块不用修改就能在他们之间传递。kafka采用Linux 中系统调用sendfile的方式,直接将数据从 pagecache 转移到 socket 网络连接中。这种零拷贝方式使得kafka数据传输更加高效。
以前面文章中安装的kafka为例: Mac 安装kafka
kafka本地文件存储目录可以在配置文件serverproperties中设置,参数及默认值为:
进入该目录,可以看到kafka保存的cosumer offset和topic消息:
其中__consumer_offsets开头的为消费的offset信息,test1开头的即为之前创建的topic “test1”,该topic有三个分区,分区编号从0开始,分别是test1-0、test1-1、test1-2。
进入test1-0,查看包含文件如下:
可以看到kafka消息按partition存储的,每个partition一个目录。partition下消息分段(segment)存储,默认每段最大1G,通过参数logsegmentbytes可配置。segment包含索引文件index、消息文件log,分别存储消息的索引和内容,以index和log结尾,文件命名为当前segment第一个消息offset。index文件在log每隔一定数据量之间建立索引,可以通过参数indexintervalbytes配置。
通过kafka命令查看00000000000000000000index内容如下:
00000000000000000000log内容如下:
其中索引文件中包含两个字段:(offset,position),分别表示消息offset和该消息在log文件的偏移量。如上图中offset=0的消息对应的position=0;对应的就是00000000000000000000log中的第一条消息:
其中payload为具体的消息内容。
另外里面还有一个以"timeindex"结尾的文件,查看其内容:
该日志文件是kafka01011加入的,其中保存的为:(消息时间戳,offset)。时间戳是该segment最后一个消息对应的时间戳(与log文件中最后一条记录时间戳一致),kafka也支持根据时间来读取消息。
由上可知消息是按partition来存储的,partition可以配置n个副本followers。多个partition和其follower在broker上是怎么分配的呢?
partition和broker都进行了排序,下标从0开始;
假设有k个broker,第i个partition被分配到到 i%k 个broker上;
第i%k个broker即为partition i 的leader,负责这个partition的读写 ;
partition的followers也进行排序,从leader的后续broker开始分配,第i个partition的第j个副本broker为 (j+ i%k)%k。
一个有3个broker的kafka集群,包含3个partition,每个partition副本数为1的topic如下图:
总结:
kafka将消息日志采用顺序写入的方式存放在broker磁盘中;数据传输通过系统调用sendfile零拷贝方式;消息日志分段存放,可配置清除时间或大小阈值;每段包含消息索引、消息内容两个文件,通过索引实现快速查找;按照/topic/partition的目录结构分开存储,且均匀分布到集群各broker上。
参考:
>
以下命令中使用的zookeeper配置地址为127001:2181,bootstrap--server(即broker)地址为: 127001:9292
1,查看kafka topic列表,使用--list参数
bin/kafka-topicssh --zookeeper 127001:2181 --list
__consumer_offsets
lx_test_topic
test
2,查看kafka特定topic的详情,使用--topic与--describe参数
bin/kafka-topicssh --zookeeper 127001:2181 --topic lx_test_topic --describe
Topic:lx_test_topic PartitionCount:1 ReplicationFactor:1 Configs:
Topic: lx_test_topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
列出了lx_test_topic的parition数量、replica因子以及每个partition的leader、replica信息
3,查看consumer group列表,使用--list参数
查看consumer group列表有新、旧两种命令,分别查看新版(信息保存在broker中)consumer列表和老版(信息保存在zookeeper中)consumer列表,因而需要区分指定bootstrap--server和zookeeper参数:
bin/kafka-consumer-groupssh --new-consumer --bootstrap-server 127001:9292 --list
lx_test
bin/kafka-consumer-groupssh --zookeeper 127001:2181 --list
console-consumer-86845
console-consumer-11967
4,查看特定consumer group 详情,使用--group与--describe参数
同样根据新/旧版本的consumer,分别指定bootstrap-server与zookeeper参数:
bin/kafka-consumer-groupssh --new-consumer --bootstrap-server 127001:9292 --group lx_test --describe
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER
lx_test lx_test_topic 0 465 465 0 kafka-python-131_/127001
bin/kafka-consumer-groupssh --zookeeper 127001:2181 --group console-consumer-11967 --describe
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER
Could not fetch offset from zookeeper for group console-consumer-11967 partition [lx_test_topic,0] due to missing offset data in zookeeper
console-consumer-11967 lx_test_topic 0 unknown 465 unknown console-consumer-11967_aws-lx-1513787888172-d3a91f05-0
其中依次展示group名称、消费的topic名称、partition id、consumer group最后一次提交的offset、最后提交的生产消息offset、消费offset与生产offset之间的差值、当前消费topic-partition的group成员id(不一定包含hostname)
上面示例中console-consumer-11967是为了测试临时起的一个console consumer,缺少在zookeeper中保存的current_offset信息。
本文将介绍最常用的分布式消息中间件kafka。由于笔者水平受限,因此介绍不一定全面,也不会太深入,仅供参考。
如果启动时提示命令语法不正确,那么需要在kafka安装目录中找到bin\windows目录中的kafka-run-classbat,为set COMMAND后面的%CLASSPATH%加上双引号
注意22版本可以直接用--bootstrap-server替代--zookeeper
一条消息是一个record batch,包含record batch header,每条record又有各自的header
一个segment由index和log组成。index是索引文件,记录每条消息的offset和在log中的地址,log中存储具体的数据。segment大小固定,但是包含不同数目的消息,segment文件的命名由上一个segment的最后一条消息的offset决定。查询指定offset消息的过程是先通过二分查找找到对应的segment,然后在index文件中通过二分查找找到对应的存储地址。
compaction指对相同key的数据进行合并。
每个partition都有一个leader,若干个followers,读写请求发送给leader处理。leader维护了一个isr(in-sync replicas)列表,写数据时只有当指定数量的isr告知已收到(acknowledge)leader才会commit,而数据只有commit之后才会被消费者看到。告知已收到的数量可以由producer决定,包括0,1或者all(-1)
如果分区的当前leader挂掉了,会从isr列表中重新选举leader。如果列表中的所有节点都挂掉了,那么有以下几种策略
Kafka 消息是以主题为单位进行归类,各个主题之间是彼此独立的,互不影响。
每个主题⼜可以分为⼀个或多个分区。
每个分区各⾃存在⼀个记录消息数据的日志文件。
图中,创建了⼀个 tp_demo_01 主题,其存在6个 Parition,对应的每个Parition下存在⼀个 [Topic-Parition] 命名的消息⽇志⽂件。在理想情况下,数据流量分摊到各个 Parition 中,实现了负载均衡的效果。在分区日志文件中,你会发现很多类型的⽂件,比如: index、timestamp、log、snapshot 等。
其中,文件名⼀致的⽂件集合就称为 LogSement。
当满⾜如下⼏个条件中的其中之⼀,就会触发文件的切分:
偏移量索引文件用于记录消息偏移量与物理地址之间的映射关系。时间戳索引文件则根据时间戳查找对应的偏移量。
文件:
查看⼀个topic分区目录下的内容,发现有log、index和timeindex三个⽂件:
创建主题:
创建消息⽂件:
将⽂本消息⽣产到主题中:
查看存储⽂件:
如果想查看这些文件,可以使⽤kafka提供的shell来完成,几个关键信息如下:
(1)offset是逐渐增加的整数,每个offset对应⼀个消息的偏移量。
(2)position:消息批字节数,用于计算物理地址。
(3)CreateTime:时间戳。
(4)magic:2代表这个消息类型是V2,如果是0则代表是V0类型,1代表V1类型。
(5)compresscodec:None说明没有指定压缩类型,kafka目前提供了4种可选择,0-None、1-GZIP、2-snappy、3-lz4。
(6)crc:对所有字段进行校验后的crc值。
在偏移量索引文件中,索引数据都是顺序记录 offset ,但时间戳索引文件中每个追加的索引时间戳必须大于之前追加的索引项,否则不予追加。在 Kafka 01100 以后,消息元数据中存在若⼲的时间戳信息。如果 broker 端参数logmessagetimestamptype 设置为 LogAppendTIme ,那么时间戳必定能保持单调增⻓。反之如果是CreateTime 则⽆法保证顺序。
注意:timestamp文件中的 offset 与 index ⽂件中的 relativeOffset 不是⼀⼀对应的。因为数据的写⼊是各自追加。
思考:如何查看偏移量为23的消息?
Kafka 中存在⼀个 ConcurrentSkipListMap 来保存在每个日志分段,通过跳跃表方式,定位到在00000000000000000000index ,通过二分法在偏移量索引文件中找到不⼤于 23 的最⼤索引项,即 offset 20 那栏,然后从⽇志分段⽂件中的物理位置为320 开始顺序查找偏移量为 23 的消息。
在偏移量索引文件中,索引数据都是顺序记录 offset ,但时间戳索引⽂件中每个追加的索引时间戳必须大于之前追加的索引项,否则不予追加。在 Kafka 01100 以后,消息信息中存在若⼲的时间戳信息。
如果 broker 端参数logmessagetimestamptype 设置为 LogAppendTIme ,那么时间戳必定能保持单调增长。反之如果是CreateTime 则无法保证顺序。
通过时间戳方式进行查找消息,需要通过查找时间戳索引和偏移量索引两个文件。
时间戳索引索引格式:前⼋个字节表示时间戳,后四个字节表示偏移量。
思考:查找时间戳为 1557554753430 开始的消息?
Kafka 提供两种⽇志清理策略:
⽇志删除:按照⼀定的删除策略,将不满⾜条件的数据进⾏数据删除
⽇志压缩:针对每个消息的 Key 进⾏整合,对于有相同 Key 的不同 Value 值,只保留最后⼀个版本。
Kafka 提供 logcleanuppolicy 参数进⾏相应配置,默认值: delete ,还可以选择 compact 。
主题级别的配置项是 cleanuppolicy 。
基于时间
⽇志删除任务会根据 logretentionhours/logretentionminutes/logretentionms 设定⽇志保留的
时间节点。如果超过该设定值,就需要进⾏删除。默认是 7 天, logretentionms 优先级最⾼。
Kafka 依据⽇志分段中最⼤的时间戳进⾏定位。
⾸先要查询该⽇志分段所对应的时间戳索引⽂件,查找时间戳索引⽂件中最后⼀条索引项,若最后⼀条索引项的时间戳字段值⼤于 0,则取该值,否则取最近修改时间。
为什么不直接选最近修改时间呢?
因为日志文件可以有意⽆意的被修改,并不能真实的反应日志分段的最⼤时间信息。
删除过程
⽇志压缩是Kafka的⼀种机制,可以提供较为细粒度的记录保留,⽽不是基于粗粒度的基于时间的保留。
对于具有相同的Key,⽽数据不同,只保留最后⼀条数据,前⾯的数据在合适的情况下删除。
⽇志压缩特性,就实时计算来说,可以在异常容灾⽅⾯有很好的应⽤途径。⽐如,我们在Spark、Flink中做实时
计算时,需要⻓期在内存⾥⾯维护⼀些数据,这些数据可能是通过聚合了⼀天或者⼀周的⽇志得到的,这些数据⼀旦
由于异常因素(内存、⽹络、磁盘等)崩溃了,从头开始计算需要很⻓的时间。⼀个⽐较有效可⾏的⽅式就是定时将
内存⾥的数据备份到外部存储介质中,当崩溃出现时,再从外部存储介质中恢复并继续计算。
使⽤⽇志压缩来替代这些外部存储有哪些优势及好处呢?这⾥为⼤家列举并总结了⼏点:
Kafka即是数据源⼜是存储⼯具,可以简化技术栈,降低维护成本
使⽤外部存储介质的话,需要将存储的Key记录下来,恢复的时候再使⽤这些Key将数据取回,实现起来有⼀定的⼯程难度和复杂度。使⽤Kafka的⽇志压缩特性,只需要把数据写进Kafka,等异常出现恢复任务时再读
回到内存就可以了
Kafka对于磁盘的读写做了⼤量的优化⼯作,⽐如磁盘顺序读写。相对于外部存储介质没有索引查询等⼯作
量的负担,可以实现⾼性能。同时,Kafka的⽇志压缩机制可以充分利⽤廉价的磁盘,不⽤依赖昂贵的内存
来处理,在性能相似的情况下,实现⾮常⾼的性价⽐(这个观点仅仅针对于异常处理和容灾的场景来说)
主题的 cleanuppolicy 需要设置为compact。
Kafka的后台线程会定时将Topic遍历两次:
为了解决重试导致的消息重复、乱序问题,kafka引入了幂等消息。幂等消息保证producer在一次会话内写入一个partition内的消息具有幂等性,可以通过重试来确保消息发布的Exactly Once语义。
实现逻辑很简单:
producer每次启动后,首先向broker申请一个全局唯一的pid,用来标识本次会话。
message_v2 增加了sequence number字段,producer每发一批消息,seq就加1。
broker在内存维护(pid,seq)映射,收到消息后检查seq,如果,
producer在收到明确的的消息丢失ack,或者超时后未收到ack,要进行重试。
考虑在stream处理的场景中,需要多个消息的原子写入语义,要么全部写入成功,要么全部失败,这就是kafka事务消息要解决的问题。
事务消息是由producer、事务协调器、broker、组协调器、consumer共同参与实现的,
为producer指定固定的TransactionalId,可以穿越producer的多次会话(producer重启/断线重连)中,持续标识producer的身份。
使用epoch标识producer的每一次"重生",防止同一producer存在多个会话。
producer遵从幂等消息的行为,并在发送的recordbatch中增加事务id和epoch。
引入事务协调器,以两阶段提交的方式,实现消息的事务提交。
事务协调器使用一个特殊的topic:transaction,来做事务提交日志。
事务控制器通过RPC调研,协调 broker 和 consumer coordinator 实现事务的两阶段提交。
每一个broker都会启动一个事务协调器,使用hash(TransactionalId)确定producer对应的事务协调器,使得整个集群的负载均衡。
broker处理在事务协调器的commit/abort控制消息,把控制消息向正常消息一样写入topic(和正常消息交织在一起,用来确认事务提交的日志偏移),并向前推进消息提交偏移hw。
如果在事务过程中,提交了消费偏移,组协调器在offset log中写入事务消费偏移。当事务提交时,在offset log中写入事务offset确认消息。
consumer过滤未提交消息和事务控制消息,使这些消息对用户不可见。
有两种实现方式,
设置isolationlevel=read_uncommitted,此时topic的所有消息对consumer都可见。
consumer缓存这些消息,直到收到事务控制消息。若事务commit,则对外发布这些消息;若事务abort,则丢弃这些消息。
设置isolationlevel=read_committed,此时topic中未提交的消息对consumer不可见,只有在事务结束后,消息才对consumer可见。
broker给consumer的BatchRecord消息中,会包含以列表,指明哪些是"abort"事务,consumer丢弃abort事务的消息即可。
事务消息处理流程如图1所示,
图1 事务消息业务流程
流程说明:
事务协调器是分配pid和管理事务的核心,produer首先对任何一个broker发送FindCoordinatorRequest,发现自己的事务协调器。
紧接着,producer向事务协调器发送InitPidRequest,申请生成pid。
2a当指定了transactionalid时,事务协调器为producer分区pid,并更新epoch,把(tid,pid)的映射关系写入事务日志。 同时清理tid任何未完成的事务,丢弃未提交的消息。
启动事务是producer的本地 *** 作,促使producer更新内部状态,不会和事务协调器发生关系。
事务协调器自动启动事务,始终处在一个接一个的事务处理状态机中。
对于每一个要在事务中写消息的topic分区,producer应当在第一次发消息前,向事务处理器注册分区。
41a事务处理器把事务关联的分区写入事务日志。
在提交或终止事务时,事务协调器需要这些信息,控制事务涉及的所有分区leader完成事务提交或终止。
42a producer向分区leader写消息,消息中包含tid,pid,epoch和seq。
43 提交消费偏移 -- AddOffsetCommitsToTxnRequest
43a producer向事务协调器发送消费偏移,事务协调器在事务日志中记录偏移信息,并把组协调器返回给producer。
44a producer向组协调器发送TxnOffsetCommitRequest,组协调器把偏移信息写入偏移日志。但是,要一直等到事务提交后,这个偏移才生效,对外部可见。
收到提交或终止事务的请求时,事务处理器执行下面的 *** 作:
1 在事务日志中写入PREPARE_COMMIT或PREPARE_ABORT消息(51a)。
2 通过WriteTxnMarkerRequest向事务中的所有broker发事务控制消息(52)。
3 在事务之日中写入COMMITTED或ABORTED消息(53)。
这个消息由事务处理器发给事务中所涉及分区的leader。
当收到这个消息后,broker会在分区log中写入一个COMMIT或ABORT控制消息。同时,也会更新该分区的事务提交偏移hw。
如果事务中有提交消费偏移, broker也会把控制消息写入 __consumer-offsets log,并通知组协调器使事务中提交的消费偏移生效。
当所有的commit或abort消息写入数据日志,事务协调器在事务日志中写入事务日志,标志这事务结束。
至此,本事务的所有状态信息都可以被删除,可以开始一个新的事务。
在实现上,还有很多细节,比如,事务协调器会启动定时器,用来检测并终止开始后长时间不活动的事务,具体请参考下面列出的kafka社区技术文档。
总结:
我们要认识到,虽然kafka事务消息提供了多个消息原子写的保证,但它不保证原子读。
例如,
也就是说,虽然kafka log持久化了数据,也可以通过指定offset多次消费数据,但由于分区数据之间的无序性,导致每次处理输出的结果都是不同的。这使得kafka stream不能像hadoop批处理任务一样,可以随时重新执行,保证每次执行的结果相同。除非我们只从一个topic分区读数据。
[0] >
作为一款典型的消息中间件产品,kafka系统仍然由producer、broker、consumer三部分组成。kafka涉及的几个常用概念和组件简单介绍如下:
当consumer group的状态发生变化(如有consumer故障、增减consumer成员等)或consumer group消费的topic状态发生变化(如增加了partition,消费的topic发生变化),kafka集群会自动调整和重新分配consumer消费的partition,这个过程就叫做rebalance(再平衡)。
__consumer_offsets是kafka集群自己维护的一个特殊的topic,它里面存储的是每个consumer group已经消费了每个topic partition的offset。__consumer_offsets中offset消息的key由group id,topic name,partition id组成,格式为 {topic name}-${partition id},value值就是consumer提交的已消费的topic partition offset值。__consumer_offsets的分区数和副本数分别由offsetstopicnumpartitions(默认值为50)和offsetstopicreplicationfactor(默认值为1)参数配置。我们通过公式 hash(group id) % offsetstopicnumpartitions 就可以计算出指定consumer group的已提交offset存储的partition。由于consumer group提交的offset消息只有最后一条消息有意义,所以__consumer_offsets是一个compact topic,kafka集群会周期性的对__consumer_offsets执行compact *** 作,只保留最新的一次提交offset。
group coordinator运行在kafka某个broker上,负责consumer group内所有的consumer成员管理、所有的消费的topic的partition的消费关系分配、offset管理、触发rebalance等功能。group coordinator管理partition分配时,会指定consumer group内某个consumer作为group leader执行具体的partition分配任务。存储某个consumer group已提交offset的__consumer_offsets partition leader副本所在的broker就是该consumer group的协调器运行的broker。
跟大多数分布式系统一样,集群有一个master角色管理整个集群,协调集群中各个成员的行为。kafka集群中的controller就相当于其它分布式系统的master,用来负责集群topic的分区分配,分区leader选举以及维护集群的所有partition的ISR等集群协调功能。集群中哪个borker是controller也是通过一致性协议选举产生的,28版本之前通过zookeeper进行选主,28版本后通过kafka raft协议进行选举。如果controller崩溃,集群会重新选举一个broker作为新的controller,并增加controller epoch值(相当于zookeeper ZAB协议的epoch,raft协议的term值)
当kafka集群新建了topic或为一个topic新增了partition,controller需要为这些新增加的partition分配到具体的broker上,并把分配结果记录下来,供producer和consumer查询获取。
因为只有partition的leader副本才会处理producer和consumer的读写请求,而partition的其他follower副本需要从相应的leader副本同步消息,为了尽量保证集群中所有broker的负载是均衡的,controller在进行集群全局partition副本分配时需要使partition的分布情况是如下这样的:
在默认情况下,kafka采用轮询(round-robin)的方式分配partition副本。由于partition leader副本承担的流量比follower副本大,kafka会先分配所有topic的partition leader副本,使所有partition leader副本全局尽量平衡,然后再分配各个partition的follower副本。partition第一个follower副本的位置是相应leader副本的下一个可用broker,后面的副本位置依此类推。
举例来说,假设我们有两个topic,每个topic有两个partition,每个partition有两个副本,这些副本分别标记为1-1-1,1-1-2,1-2-1,1-2-2,2-1-1,2-1-2,2-2-1,2-2-2(编码格式为topic-partition-replia,编号均从1开始,第一个replica是leader replica,其他的是follower replica)。共有四个broker,编号是1-4。我们先对broker按broker id进行排序,然后分配leader副本,最后分配foller副本。
1)没有配置brokerrack的情况
现将副本1-1-1分配到broker 1,然后1-2-1分配到broker 2,依此类推,2-2-1会分配到broker 4。partition 1-1的leader副本分配在broker 1上,那么下一个可用节点是broker 2,所以将副本1-1-2分配到broker 2上。同理,partition 1-2的leader副本分配在broker 2上,那么下一个可用节点是broker 3,所以将副本1-1-2分配到broker 3上。依此类推分配其他的副本分片。最后分配的结果如下图所示:
2)配置了brokerrack的情况
假设配置了两个rack,broker 1和broker 2属于Rack 1,broker 3和broker 4属于Rack 2。我们对rack和rack内的broker分别排序。然后先将副本1-1-1分配到Rack 1的broker 1,然后将副本1-2-1分配到下一个Rack的第一个broker,即Rack 2的broker 3。其他的parttition leader副本依此类推。然后分配follower副本,partition 1-1的leader副本1-1-1分配在Rack 1的broker上,下一个可用的broker是Rack 2的broker 3,所以分配到broker 3上,其他依此类推。最后分配的结果如下图所示:
kafka除了按照集群情况自动分配副本,也提供了reassign工具人工分配和迁移副本到指定broker,这样用户可以根据集群实际的状态和各partition的流量情况分配副本
kafka集群controller的一项功能是在partition的副本中选择一个副本作为leader副本。在topic的partition创建时,controller首先分配的副本就是leader副本,这个副本又叫做preference leader副本。
当leader副本所在broker失效时(宕机或网络分区等),controller需要为在该broker上的有leader副本的所有partition重新选择一个leader,选择方法就是在该partition的ISR中选择第一个副本作为新的leader副本。但是,如果ISR成员只有一个,就是失效的leader自身,其余的副本都落后于leader怎么办?kafka提供了一个uncleanleaderelection配置参数,它的默认值为true。当uncleanleaderelection值为true时,controller还是会在非ISR副本中选择一个作为leader,但是这时候使用者需要承担数据丢失和数据不一致的风险。当uncleanleaderelection值为false时,则不会选择新的leader,该partition处于不可用状态,只能恢复失效的leader使partition重新变为可用。
当preference leader失效后,controller重新选择一个新的leader,但是preference leader又恢复了,而且同步上了新的leader,是ISR的成员,这时候preference leader仍然会成为实际的leader,原先的新leader变为follower。因为在partition leader初始分配时,使按照集群副本均衡规则进行分配的,这样做可以让集群尽量保持平衡。
为了保证topic的高可用,topic的partition往往有多个副本,所有的follower副本像普通的consumer一样不断地从相应的leader副本pull消息。每个partition的leader副本会维护一个ISR列表存储到集群信息库里,follower副本成为ISR成员或者说与leader是同步的,需要满足以下条件:
1)follower副本处于活跃状态,与zookeeper(28之前版本)或kafka raft master之间的心跳正常
2)follower副本最近replicalagtimemaxms(默认是10秒)时间内从leader同步过最新消息。需要注意的是,一定要拉取到最新消息,如果最近replicalagtimemaxms时间内拉取过消息,但不是最新的,比如落后follower在追赶leader过程中,也不会成为ISR。
follower在同步leader过程中,follower和leader都会维护几个参数,来表示他们之间的同步情况。leader和follower都会为自己的消息队列维护LEO(Last End Offset)和HW(High Watermark)。leader还会为每一个follower维护一个LEO。LEO表示leader或follower队列写入的最后一条消息的offset。HW表示的offset对应的消息写入了所有的ISR。当leader发现所有follower的LEO的最小值大于HW时,则会增加HW值到这个最小值LEO。follower拉取leader的消息时,同时能获取到leader维护的HW值,如果follower发现自己维护的HW值小于leader发送过来的HW值,也会增加本地的HW值到leader的HW值。这样我们可以得到一个不等式: follower HW <= leader HW <= follower LEO <= leader LEO 。HW对应的log又叫做committed log,consumer消费partititon的消息时,只能消费到offset值小于或等于HW值的消息的,由于这个原因,kafka系统又称为分布式committed log消息系统。
kafka的消息内容存储在logdirs参数配置的目录下。kafka每个partition的数据存放在本地磁盘logdirs目录下的一个单独的目录下,目录命名规范为 ${topicName}-${partitionId} ,每个partition由多个LogSegment组成,每个LogSegment由一个数据文件(命名规范为: {baseOffset}index)和一个时间戳索引文件(命名规范为:${baseOffset}timeindex)组成,文件名的baseOffset就是相应LogSegment中第一条消息的offset。index文件存储的是消息的offset到该消息在相应log文件中的偏移,便于快速在log文件中快速找到指定offset的消息。index是一个稀疏索引,每隔一定间隔大小的offset才会建立相应的索引(比如每间隔10条消息建立一个索引)。timeindex也是一个稀疏索引文件,这样可以根据消息的时间找到对应的消息。
可以考虑将消息日志存放到多个磁盘中,这样多个磁盘可以并发访问,增加消息读写的吞吐量。这种情况下,logdirs配置的是一个目录列表,kafka会根据每个目录下partition的数量,将新分配的partition放到partition数最少的目录下。如果我们新增了一个磁盘,你会发现新分配的partition都出现在新增的磁盘上。
kafka提供了两个参数logsegmentbytes和logsegmentms来控制LogSegment文件的大小。logsegmentbytes默认值是1GB,当LogSegment大小达到logsegmentbytes规定的阈值时,kafka会关闭当前LogSegment,生成一个新的LogSegment供消息写入,当前供消息写入的LogSegment称为活跃(Active)LogSegment。logsegmentms表示最大多长时间会生成一个新的LogSegment,logsegmentms没有默认值。当这两个参数都配置了值,kafka看哪个阈值先达到,触发生成新的LogSegment。
kafka还提供了logretentionms和logretentionbytes两个参数来控制消息的保留时间。当消息的时间超过了logretentionms配置的阈值(默认是168小时,也就是一周),则会被认为是过期的,会被kafka自动删除。或者是partition的总的消息大小超过了logretentionbytes配置的阈值时,最老的消息也会被kafka自动删除,使相应partition保留的总消息大小维持在logretentionbytes阈值以下。这个地方需要注意的是,kafka并不是以消息为粒度进行删除的,而是以LogSegment为粒度删除的。也就是说,只有当一个LogSegment的最后一条消息的时间超过logretentionms阈值时,该LogSegment才会被删除。这两个参数都配置了值时,也是只要有一个先达到阈值,就会执行相应的删除策略
当我们使用KafkaProducer向kafka发送消息时非常简单,只要构造一个包含消息key、value、接收topic信息的ProducerRecord对象就可以通过KafkaProducer的send()向kafka发送消息了,而且是线程安全的。KafkaProducer支持通过三种消息发送方式
KafkaProducer客户端虽然使用简单,但是一条消息从客户端到topic partition的日志文件,中间需要经历许多的处理过程。KafkaProducer的内部结构如下所示:
从图中可以看出,消息的发送涉及两类线程,一类是调用KafkaProducersend()方法的应用程序线程,因为KafkaProducersend()是多线程安全的,所以这样的线程可以有多个;另一类是与kafka集群通信,实际将消息发送给kafka集群的Sender线程,当我们创建一个KafkaProducer实例时,会创建一个Sender线程,通过该KafkaProducer实例发送的所有消息最终通过该Sender线程发送出去。RecordAccumulator则是一个消息队列,是应用程序线程与Sender线程之间消息传递的桥梁。当我们调用KafkaProducersend()方法时,消息并没有直接发送出去,只是写入了RecordAccumulator中相应的队列中,最终需要Sender线程在适当的时机将消息从RecordAccumulator队列取出来发送给kafka集群。
消息的发送过程如下:
在使用KafkaConsumer实例消费kafka消息时,有一个特性我们要特别注意,就是KafkaConsumer不是多线程安全的,KafkaConsumer方法都在调用KafkaConsumer的应用程序线程中运行(除了consumer向kafka集群发送的心跳,心跳在一个专门的单独线程中发送),所以我们调用KafkaConsumer的所有方法均需要保证在同一个线程中调用,除了KafkaConsumerwakeup()方法,它设计用来通过其它线程向consumer线程发送信号,从而终止consumer执行。
跟producer一样,consumer要与kafka集群通信,消费kafka消息,首先需要获取消费的topic partition leader replica所在的broker地址等信息,这些信息可以通过向kafka集群任意broker发送Metadata请求消息获取。
我们知道,一个consumer group有多个consumer,一个topic有多个partition,而且topic的partition在同一时刻只能被consumer group内的一个consumer消费,那么consumer在消费partition消息前需要先确定消费topic的哪个partition。partition的分配通过group coordinator来实现。基本过程如下:
我们可以通过实现接口orgapachekafkaclientsconsumerinternalsPartitionAssignor自定义partition分配策略,但是kafka已经提供了三种分配策略可以直接使用。
partition分配完后,每个consumer知道了自己消费的topic partition,通过metadata请求可以获取相应partition的leader副本所在的broker信息,然后就可以向broker poll消息了。但是consumer从哪个offset开始poll消息?所以consumer在第一次向broker发送FetchRequest poll消息之前需要向Group Coordinator发送OffsetFetchRequest获取消费消息的起始位置。Group Coordinator会通过key {topic}-${partition}查询 __consumer_offsets topic中是否有offset的有效记录,如果存在,则将consumer所属consumer group最近已提交的offset返回给consumer。如果没有(可能是该partition是第一次分配给该consumer group消费,也可能是该partition长时间没有被该consumer group消费),则根据consumer配置参数autooffsetreset值确定consumer消费的其实offset。如果autooffsetreset值为latest,表示从partition的末尾开始消费,如果值为earliest,则从partition的起始位置开始消费。当然,consumer也可以随时通过KafkaConsumerseek()方法人工设置消费的起始offset。
kafka broker在收到FetchRequest请求后,会使用请求中topic partition的offset查一个skiplist表(该表的节点key值是该partition每个LogSegment中第一条消息的offset值)确定消息所属的LogSegment,然后继续查LogSegment的稀疏索引表(存储在index文件中),确定offset对应的消息在LogSegment文件中的位置。为了提升消息消费的效率,consumer通过参数fetchminbytes和maxpartitionfetchbytes告诉broker每次拉取的消息总的最小值和每个partition的最大值(consumer一次会拉取多个partition的消息)。当kafka中消息较少时,为了让broker及时将消息返回给consumer,consumer通过参数fetchmaxwaitms告诉broker即使消息大小没有达到fetchminbytes值,在收到请求后最多等待fetchmaxwaitms时间后,也将当前消息返回给consumer。fetchminbytes默认值为1MB,待fetchmaxwaitms默认值为500ms。
为了提升消息的传输效率,kafka采用零拷贝技术让内核通过DMA把磁盘中的消息读出来直接发送到网络上。因为kafka写入消息时将消息写入内存中就返回了,如果consumer跟上了producer的写入速度,拉取消息时不需要读磁盘,直接从内存获取消息发送出去就可以了。
为了避免发生再平衡后,consumer重复拉取消息,consumer需要将已经消费完的消息的offset提交给group coordinator。这样发生再平衡后,consumer可以从上次已提交offset出继续拉取消息。
kafka提供了多种offset提交方式
partition offset提交和管理对kafka消息系统效率来说非常关键,它直接影响了再平衡后consumer是否会重复拉取消息以及重复拉取消息的数量。如果offset提交的比较频繁,会增加consumer和kafka broker的消息处理负载,降低消息处理效率;如果offset提交的间隔比较大,再平衡后重复拉取的消息就会比较多。还有比较重要的一点是,kafka只是简单的记录每次提交的offset值,把最后一次提交的offset值作为最新的已提交offset值,作为再平衡后消息的起始offset,而什么时候提交offset,每次提交的offset值具体是多少,kafka几乎不关心(这个offset对应的消息应该存储在kafka中,否则是无效的offset),所以应用程序可以先提交3000,然后提交2000,再平衡后从2000处开始消费,决定权完全在consumer这边。
kafka中的topic partition与consumer group中的consumer的消费关系其实是一种配对关系,当配对双方发生了变化时,kafka会进行再平衡,也就是重新确定这种配对关系,以提升系统效率、高可用性和伸缩性。当然,再平衡也会带来一些负面效果,比如在再平衡期间,consumer不能消费kafka消息,相当于这段时间内系统是不可用的。再平衡后,往往会出现消息的重复拉取和消费的现象。
触发再平衡的条件包括:
需要注意的是,kafka集群broker的增减或者topic partition leader重新选主这类集群状态的变化并不会触发在平衡
有两种情况与日常应用开发比较关系比较密切:
consumer在调用subscribe()方法时,支持传入一个ConsumerRebalanceListener监听器,ConsumerRebalanceListener提供了两个方法,onPartitionRevoked()方法在consumer停止消费之后,再平衡开始之前被执行。可以发现,这个地方是提交offset的好时机。onPartitonAssigned()方法则会在重新进行partition分配好了之后,但是新的consumer还未消费之前被执行。
我们在提到kafka时,首先想到的是它的吞吐量非常大,这也是很多人选择kafka作为消息传输组件的重要原因。
以下是保证kafka吞吐量大的一些设计考虑:
但是kafka是不是总是这么快?我们同时需要看到kafka为了追求快舍弃了一些特性:
所以,kafka在消息独立、允许少量消息丢失或重复、不关心消息顺序的场景下可以保证非常高的吞吐量,但是在需要考虑消息事务、严格保证消息顺序等场景下producer和consumer端需要进行复杂的考虑和处理,可能会比较大的降低kafka的吞吐量,例如对可靠性和保序要求比较高的控制类消息需要非常谨慎的权衡是否适合使用kafka。
我们通过producer向kafka集群发送消息,总是期望消息能被consumer成功消费到。最不能忍的是producer收到了kafka集群消息写入的正常响应,但是consumer仍然没有消费到消息。
kafka提供了一些机制来保证消息的可靠传递,但是有一些因素需要仔细权衡考虑,这些因素往往会影响kafka的吞吐量,需要在可靠性与吞吐量之间求得平衡:
kafka只保证partition消息顺序,不保证topic级别的顺序,而且保证的是partition写入顺序与读取顺序一致,不是业务端到端的保序。
如果对保序要求比较高,topic需要只设置一个partition。这时可以把参数maxinflightrequestsperconnection设置为1,而retries设置为大于1的数。这样即使发生了可恢复型错误,仍然能保证消息顺序,但是如果发生不可恢复错误,应用层进行重试的话,就无法保序了。也可以采用同步发送的方式,但是这样也极大的降低了吞吐量。如果消息携带了表示顺序的字段,可以在接收端对消息进行重新排序以保证最终的有序。
以上就是关于kafka消息的管理全部的内容,包括:kafka消息的管理、kafka-docker上使用+常用指令、Kafka查看topic、consumer group状态命令等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)